From 97c8db93cd7cac53794108e87379db82ce301de7 Mon Sep 17 00:00:00 2001 From: wangxianbin1987 Date: Thu, 14 Apr 2016 18:54:28 +0800 Subject: [PATCH] KYLIN-1077 Support Hive View as Lookup Table Signed-off-by: wangxianbin1987 --- core-cube/pom.xml | 6 ++ .../java/org/apache/kylin/cube/CubeManager.java | 17 ++- core-dictionary/pom.xml | 6 ++ .../org/apache/kylin/dict/DictionaryManager.java | 23 +++- .../kylin/job/constant/ExecutableConstants.java | 1 + .../apache/kylin/metadata/MetadataConstants.java | 2 + .../org/apache/kylin/metadata/model/TableDesc.java | 9 ++ .../kylin/engine/mr/BatchCubingJobBuilder2.java | 2 +- .../apache/kylin/cube/ITDictionaryManagerTest.java | 11 +- .../org/apache/kylin/source/hive/HiveMRInput.java | 117 ++++++++++++++++++--- .../kylin/source/hive/HiveSourceTableLoader.java | 1 + 11 files changed, 170 insertions(+), 25 deletions(-) diff --git a/core-cube/pom.xml b/core-cube/pom.xml index f41b507..661fa59 100644 --- a/core-cube/pom.xml +++ b/core-cube/pom.xml @@ -44,6 +44,12 @@ kylin-core-dictionary ${project.parent.version} + + org.apache.hive + hive-metastore + ${hive.version} + provided + com.n3twork.druid diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 01b05da..72d6927 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.KylinConfigExt; import org.apache.kylin.common.persistence.JsonSerializer; @@ -43,6 +44,7 @@ import org.apache.kylin.dict.lookup.LookupStringTable; import org.apache.kylin.dict.lookup.SnapshotManager; import org.apache.kylin.dict.lookup.SnapshotTable; import org.apache.kylin.dimension.Dictionary; +import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TableDesc; @@ -164,7 +166,7 @@ public class CubeManager implements IRealizationProvider { return null; DictionaryManager dictMgr = getDictionaryManager(); - DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(),true, col, factTableValueProvider); + DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeSeg, true, col, factTableValueProvider); if (dictInfo != null) { cubeSeg.putDictResPath(col, dictInfo.getResourcePath()); @@ -204,7 +206,14 @@ public class CubeManager implements IRealizationProvider { MetadataManager metaMgr = getMetadataManager(); SnapshotManager snapshotMgr = getSnapshotManager(); - TableDesc tableDesc = metaMgr.getTableDesc(lookupTable); + TableDesc tableDesc = new TableDesc(metaMgr.getTableDesc(lookupTable)); + String tableType = metaMgr.getTableDescExd(lookupTable).get(MetadataConstants.TABLE_EXD_TABLETYPE); + if (tableType.equals(TableType.VIRTUAL_VIEW.toString())) { + tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable()); + String tableName = generateHiveViewIntermediateTableName(tableDesc, cubeSeg.getCubeInstance().getName(), cubeSeg.getName()); + tableDesc.setName(tableName); + } + ReadableTable hiveTable = SourceFactory.createReadableTable(tableDesc); SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc); @@ -216,6 +225,10 @@ public class CubeManager implements IRealizationProvider { return snapshot; } + private String generateHiveViewIntermediateTableName(TableDesc tableDesc, String cubeName, String cubeSegmentName) { + return "kylin_hive_view_intermediate_" + cubeName + "_" + cubeSegmentName + "_" + tableDesc.getDatabase() + "_" + tableDesc.getName(); + } + // sync on update public CubeInstance dropCube(String cubeName, boolean deleteDesc) throws IOException { logger.info("Dropping cube '" + cubeName + "'"); diff --git a/core-dictionary/pom.xml b/core-dictionary/pom.xml index 17319e0..b2a1fe0 100644 --- a/core-dictionary/pom.xml +++ b/core-dictionary/pom.xml @@ -41,6 +41,12 @@ kylin-core-metadata ${project.parent.version} + + org.apache.hive + hive-metastore + ${hive.version} + provided + diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index 015c79f..6d1d54d 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -20,14 +20,17 @@ package org.apache.kylin.dict; import com.google.common.cache.*; import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.dimension.Dictionary; +import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.realization.IRealizationSegment; import org.apache.kylin.source.ReadableTable; import org.apache.kylin.source.ReadableTable.TableSignature; import org.apache.kylin.source.SourceFactory; @@ -260,9 +263,10 @@ public class DictionaryManager { } } - public DictionaryInfo buildDictionary(DataModelDesc model, boolean usingDict, TblColRef col, DistinctColumnValuesProvider factTableValueProvider) throws IOException { + public DictionaryInfo buildDictionary(IRealizationSegment seg, boolean usingDict, TblColRef col, DistinctColumnValuesProvider factTableValueProvider) throws IOException { logger.info("building dictionary for " + col); + DataModelDesc model = seg.getRealization().getDataModelDesc(); TblColRef srcCol = decideSourceData(model, usingDict, col); String srcTable = srcCol.getTable(); @@ -273,8 +277,17 @@ public class DictionaryManager { if (model.isFactTable(srcTable)) { inpTable = factTableValueProvider.getDistinctValuesFor(srcCol); } else { - TableDesc tableDesc = MetadataManager.getInstance(config).getTableDesc(srcTable); - inpTable = SourceFactory.createReadableTable(tableDesc); + MetadataManager metadataManager = MetadataManager.getInstance(config); + TableDesc tableDesc = new TableDesc(metadataManager.getTableDesc(srcTable)); + String tableType = metadataManager.getTableDescExd(srcTable).get(MetadataConstants.TABLE_EXD_TABLETYPE); + if (tableType.equals(TableType.VIRTUAL_VIEW.toString())) { + tableDesc.setDatabase(config.getHiveDatabaseForIntermediateTable()); + String tableName = generateHiveViewIntermediateTableName(tableDesc, seg.getRealization().getName(), seg.getName()); + tableDesc.setName(tableName); + inpTable = SourceFactory.createReadableTable(tableDesc); + } else { + inpTable = SourceFactory.createReadableTable(tableDesc); + } } TableSignature inputSig = inpTable.getSignature(); @@ -293,6 +306,10 @@ public class DictionaryManager { return trySaveNewDict(dictionary, dictInfo); } + private String generateHiveViewIntermediateTableName(TableDesc tableDesc, String cubeName, String cubeSegmentName) { + return "kylin_hive_view_intermediate_" + cubeName + "_" + cubeSegmentName + "_" + tableDesc.getDatabase() + "_" + tableDesc.getName(); + } + /** * Decide a dictionary's source data, leverage PK-FK relationship. */ diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index f619a68..d47d550 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -36,6 +36,7 @@ public final class ExecutableConstants { public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; + public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables"; public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns"; public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data"; public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube"; diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java index a45a20f..4c404dc 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java @@ -41,6 +41,8 @@ public interface MetadataConstants { public static final String TABLE_EXD_IF = "inputformat"; public static final String TABLE_EXD_PARTITIONED = "partitioned"; public static final String TABLE_EXD_TABLENAME = "tableName"; + //TableType in hive metastore, they are MANAGED_TABLE, EXTERNAL_TABLE, VIRTUAL_VIEW, INDEX_TABLE + public static final String TABLE_EXD_TABLETYPE = "tableType"; public static final String TABLE_EXD_OWNER = "owner"; public static final String TABLE_EXD_TFS = "totalFileSize"; public static final String TABLE_EXD_OF = "outputformat"; diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java index 65d85dd..7604df6 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java @@ -47,6 +47,15 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware { private String identity = null; + public TableDesc() { + } + + public TableDesc(TableDesc other) { + this.name = other.getName(); + this.columns = other.getColumns(); + this.database.setName(other.getDatabase()); + } + public ColumnDesc findColumnByName(String name) { //ignore the db name and table name if exists int lastIndexOfDot = name.lastIndexOf("."); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 0b1bd90..6745a20 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -52,7 +52,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { final String jobId = result.getId(); final String cuboidRootPath = getCuboidRootPath(jobId); - // Phase 1: Create Flat Table + // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables inputSide.addStepPhase1_CreateFlatTable(result); // Phase 2: Build Dictionary diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java index c1f2369..bc032dd 100644 --- a/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/cube/ITDictionaryManagerTest.java @@ -35,6 +35,7 @@ import org.apache.kylin.dict.DistinctColumnValuesProvider; import org.apache.kylin.dimension.Dictionary; import org.apache.kylin.engine.mr.DFSFileTable; import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.source.ReadableTable; import org.junit.After; @@ -60,15 +61,17 @@ public class ITDictionaryManagerTest extends LocalFileMetadataTestCase { @Test public void basic() throws Exception { dictMgr = DictionaryManager.getInstance(getTestConfig()); - CubeDesc cubeDesc = CubeDescManager.getInstance(getTestConfig()).getCubeDesc("test_kylin_cube_without_slr_desc"); + CubeInstance cubeInstance = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_without_slr_ready"); + CubeSegment cubeSegment = cubeInstance.getSegment("67f668f6-dcff-4cb6-a89b-77f1119df8fa", SegmentStatusEnum.READY); + CubeDesc cubeDesc = cubeSegment.getCubeDesc(); TblColRef col = cubeDesc.findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_FORMAT_NAME"); MockDistinctColumnValuesProvider mockupData = new MockDistinctColumnValuesProvider("A", "B", "C"); - DictionaryInfo info1 = dictMgr.buildDictionary(cubeDesc.getModel(), true, col, mockupData); + DictionaryInfo info1 = dictMgr.buildDictionary(cubeSegment, true, col, mockupData); System.out.println(JsonUtil.writeValueAsIndentString(info1)); - DictionaryInfo info2 = dictMgr.buildDictionary(cubeDesc.getModel(), true, col, mockupData); + DictionaryInfo info2 = dictMgr.buildDictionary(cubeSegment, true, col, mockupData); System.out.println(JsonUtil.writeValueAsIndentString(info2)); // test check duplicate @@ -89,7 +92,7 @@ public class ITDictionaryManagerTest extends LocalFileMetadataTestCase { // test empty dictionary MockDistinctColumnValuesProvider mockupEmpty = new MockDistinctColumnValuesProvider(); - DictionaryInfo info3 = dictMgr.buildDictionary(cubeDesc.getModel(), true, col, mockupEmpty); + DictionaryInfo info3 = dictMgr.buildDictionary(cubeSegment, true, col, mockupEmpty); System.out.println(JsonUtil.writeValueAsIndentString(info3)); assertEquals(0, info3.getCardinality()); assertEquals(0, info3.getDictionaryObject().getSize()); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 5242d76..ed31405 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -23,13 +23,16 @@ import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.IMRInput; @@ -45,9 +48,12 @@ import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.realization.IRealizationSegment; +import sun.management.snmp.jvmmib.JvmOSMBean; public class HiveMRInput implements IMRInput { @@ -99,6 +105,7 @@ public class HiveMRInput implements IMRInput { final JobEngineConfig conf; final IRealizationSegment seg; final IJoinedFlatTableDesc flatHiveTableDesc; + String oldHiveViewIntermediateTables = ""; public BatchCubingInputSide(IRealizationSegment seg) { this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); @@ -109,6 +116,7 @@ public class HiveMRInput implements IMRInput { @Override public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { jobFlow.addTask(createFlatHiveTableStep(conf, flatHiveTableDesc, jobFlow.getId())); + jobFlow.addTask(createLookupHiveViewMaterializationStep(jobFlow.getId())); } public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId) { @@ -137,12 +145,46 @@ public class HiveMRInput implements IMRInput { return step; } + + public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) { + ShellExecutable step = new ShellExecutable(); + HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + CubeManager cubeMgr = CubeManager.getInstance(kylinConfig); + MetadataManager metadataManager = MetadataManager.getInstance(kylinConfig); + String cubeName = seg.getRealization().getName(); + CubeDesc cubeDesc = cubeMgr.getCube(cubeName).getDescriptor(); + + final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";"; + hiveCmdBuilder.addStatement(useDatabaseHql); + for(TableDesc lookUpTableDesc : cubeDesc.getLookupTableDescs()) { + String tableType = metadataManager.getTableDescExd(lookUpTableDesc.getIdentity()).get(MetadataConstants.TABLE_EXD_TABLETYPE); + if (tableType.equals(TableType.VIRTUAL_VIEW.toString())) { + StringBuilder createIntermediateTableHql = new StringBuilder(); + createIntermediateTableHql.append("CREATE TABLE IF NOT EXISTS " + + generateHiveViewIntermediateTableName(lookUpTableDesc) + "\n"); + createIntermediateTableHql.append("LOCATION '" + JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/" + + generateHiveViewIntermediateTableName(lookUpTableDesc) + "'\n"); + createIntermediateTableHql.append("AS SELECT * FROM " + lookUpTableDesc.getIdentity() + ";\n"); + hiveCmdBuilder.addStatement(createIntermediateTableHql.toString()); + oldHiveViewIntermediateTables = oldHiveViewIntermediateTables + generateHiveViewIntermediateTableName(lookUpTableDesc) + ";"; + } + oldHiveViewIntermediateTables= oldHiveViewIntermediateTables.substring(0, oldHiveViewIntermediateTables.length()-1); + } + + step.setCmd(hiveCmdBuilder.build()); + step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); + return step; + } + @Override public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { GarbageCollectionStep step = new GarbageCollectionStep(); step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); step.setIntermediateTableIdentity(getIntermediateTableIdentity()); step.setExternalDataPath(JoinedFlatTable.getTableDir(flatHiveTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()))); + step.setHiveViewIntermediateTableIdentitys(oldHiveViewIntermediateTables); jobFlow.addTask(step); } @@ -154,6 +196,10 @@ public class HiveMRInput implements IMRInput { private String getIntermediateTableIdentity() { return conf.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatHiveTableDesc.getTableName(); } + + private String generateHiveViewIntermediateTableName(TableDesc tableDesc) { + return "kylin_hive_view_intermediate_" + seg.getRealization().getName() + "_" + seg.getName() + "_" + tableDesc.getDatabase() + "_" +tableDesc.getName(); + } } public static class GarbageCollectionStep extends AbstractExecutable { @@ -161,29 +207,62 @@ public class HiveMRInput implements IMRInput { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { KylinConfig config = context.getConfig(); StringBuffer output = new StringBuffer(); + try { + output.append(cleanUpIntermediateFlatTable(config)); + output.append(cleanUpHiveViewIntermediateTable(config)); + } catch (IOException e) { + logger.error("job:" + getId() + " execute finished with exception", e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage()); + } + return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString()); + } + + private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException { + StringBuffer output = new StringBuffer(); final String hiveTable = this.getIntermediateTableIdentity(); if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) { final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";"); hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";"); - try { - config.getCliCommandExecutor().execute(hiveCmdBuilder.build()); - output.append("Hive table " + hiveTable + " is dropped. \n"); - - Path externalDataPath = new Path(getExternalDataPath()); - FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration()); - if (fs.exists(externalDataPath)) { - fs.delete(externalDataPath, true); - output.append("Hive table " + hiveTable + " external data path " + externalDataPath + " is deleted. \n"); - } - } catch (IOException e) { - logger.error("job:" + getId() + " execute finished with exception", e); - return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage()); - } + + config.getCliCommandExecutor().execute(hiveCmdBuilder.build()); + output.append("Hive table " + hiveTable + " is dropped. \n"); + + rmdirOnHDFS(getExternalDataPath()); + output.append("Hive table " + hiveTable + " external data path " + getExternalDataPath() + " is deleted. \n"); } + return output.toString(); + } - return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString()); + private void mkdirOnHDFS(String path) throws IOException { + Path externalDataPath = new Path(path); + FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration()); + if (!fs.exists(externalDataPath)) { + fs.mkdirs(externalDataPath); + } + } + + private void rmdirOnHDFS(String path) throws IOException { + Path externalDataPath = new Path(path); + FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration()); + if (fs.exists(externalDataPath)) { + fs.delete(externalDataPath, true); + } + } + + private String cleanUpHiveViewIntermediateTable(KylinConfig config) throws IOException { + StringBuffer output = new StringBuffer(); + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";"); + if (!getHiveViewIntermediateTableIdentitys().isEmpty()) { + for(String hiveTableName : getHiveViewIntermediateTableIdentitys().split(";")) { + hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTableName + ";"); + } + } + config.getCliCommandExecutor().execute(hiveCmdBuilder.build()); + output.append("hive view intermediate tables: " + getHiveViewIntermediateTableIdentitys() + " is dropped. \n"); + return output.toString(); } public void setIntermediateTableIdentity(String tableIdentity) { @@ -201,6 +280,14 @@ public class HiveMRInput implements IMRInput { private String getExternalDataPath() { return getParam("externalDataPath"); } + + public void setHiveViewIntermediateTableIdentitys(String tableIdentitys) { + setParam("oldHiveViewIntermediateTables", tableIdentitys); + } + + private String getHiveViewIntermediateTableIdentitys() { + return getParam("oldHiveViewIntermediateTables"); + } } } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java index 2aef4e6..97c3bd3 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java @@ -147,6 +147,7 @@ public class HiveSourceTableLoader { map = Maps.newHashMap(); } map.put(MetadataConstants.TABLE_EXD_TABLENAME, table.getTableName()); + map.put(MetadataConstants.TABLE_EXD_TABLETYPE, table.getTableType()); map.put(MetadataConstants.TABLE_EXD_LOCATION, table.getSd().getLocation()); map.put(MetadataConstants.TABLE_EXD_IF, table.getSd().getInputFormat()); map.put(MetadataConstants.TABLE_EXD_OF, table.getSd().getOutputFormat()); -- 2.6.3.windows.1