From d0dbdd58416d2950d7854cc01a940c6ec22fbaa3 Mon Sep 17 00:00:00 2001 From: terry-chelsea Date: Fri, 28 Oct 2016 20:45:11 +0800 Subject: [PATCH 2/2] KYLIN-1826, add and modify cube source job for external hive support Signed-off-by: terry-chelsea --- .../org/apache/kylin/common/KylinConfigBase.java | 1 + .../kylin/cube/model/CubeJoinedFlatTableDesc.java | 5 + .../cube/model/CubeJoinedFlatTableEnrich.java | 4 + .../java/org/apache/kylin/job/JoinedFlatTable.java | 10 +- .../kylin/job/constant/ExecutableConstants.java | 4 +- .../kylin/metadata/model/IJoinedFlatTableDesc.java | 2 + .../apache/kylin/metadata/model/ISourceAware.java | 1 + .../org/apache/kylin/metadata/model/TableDesc.java | 2 +- .../kylin/source/hive/ITHiveTableReaderTest.java | 2 +- .../kylin/rest/response/TableDescResponse.java | 1 + .../org/apache/kylin/rest/service/CubeService.java | 8 +- .../kylin/source/hive/CreateFlatHiveTableStep.java | 11 +- .../apache/kylin/source/hive/HiveCmdBuilder.java | 13 +- .../org/apache/kylin/source/hive/HiveMRInput.java | 87 ++++++--- .../org/apache/kylin/source/hive/HiveTable.java | 7 +- .../apache/kylin/source/hive/HiveTableReader.java | 16 +- .../apache/kylin/source/hive/HqlExecutable.java | 12 +- .../hive/external/DistcpShellExecutable.java | 100 ++++++++++ .../source/hive/external/ExternalHiveMRInput.java | 200 ++++++++++++++++++++ .../source/hive/external/ExternalHiveSource.java | 51 +++++ 20 files changed, 489 insertions(+), 48 deletions(-) create mode 100644 source-hive/src/main/java/org/apache/kylin/source/hive/external/DistcpShellExecutable.java create mode 100644 source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java create mode 100644 source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveSource.java diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 5063110..39c4a3e 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -721,6 +721,7 @@ abstract public class KylinConfigBase implements Serializable { Map r = convertKeyToInteger(getPropertiesByPrefix("kylin.source.engine.")); // ref constants in ISourceAware r.put(0, "org.apache.kylin.source.hive.HiveSource"); + r.put(6, "org.apache.kylin.source.hive.external.ExternalHiveSource"); return r; } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java index 6aeb617..1e62ac9 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java @@ -161,5 +161,10 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc { public TblColRef getDistributedBy() { return cubeDesc.getDistributedByColumn(); } + + @Override + public String getHiveName() { + return this.getDataModel().getFactTableDesc().getHive(); + } } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java index 5212859..566490e 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java @@ -137,4 +137,8 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc { return flatDesc.getDistributedBy(); } + @Override + public String getHiveName() { + return this.getDataModel().getFactTableDesc().getHive(); + } } diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 699d084..7f73393 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -141,12 +141,13 @@ public class JoinedFlatTable { return sql.toString(); } - public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir) { + public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir, boolean local) { final Map tableAliasMap = buildTableAliasMap(flatDesc.getDataModel()); final StringBuilder sql = new StringBuilder(); + String localStr = local ? "LOCAL" : ""; final String factTbl = flatDesc.getDataModel().getFactTable(); sql.append("dfs -mkdir -p " + outputDir + ";\n"); - sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + factTbl + " " + tableAliasMap.get(factTbl) + "\n"); + sql.append("INSERT OVERWRITE " + localStr + " DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + factTbl + " " + tableAliasMap.get(factTbl) + "\n"); appendWhereStatement(flatDesc, sql, tableAliasMap); return sql.toString(); } @@ -285,10 +286,11 @@ public class JoinedFlatTable { return hiveDataType.toLowerCase(); } - public static String generateSelectRowCountStatement(IJoinedFlatTableDesc intermediateTableDesc, String outputDir) { + public static String generateSelectRowCountStatement(IJoinedFlatTableDesc intermediateTableDesc, String outputDir, boolean local) { StringBuilder sql = new StringBuilder(); + String localStr = local ? "LOCAL" : ""; sql.append("set hive.exec.compress.output=false;\n"); - sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + intermediateTableDesc.getTableName() + ";\n"); + sql.append("INSERT OVERWRITE " + localStr + " DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + intermediateTableDesc.getTableName() + ";\n"); return sql.toString(); } diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index 893c034..669e4a8 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -35,7 +35,9 @@ public final class ExecutableConstants { public static final int DEFAULT_SCHEDULER_INTERVAL_SECONDS = 60; public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; - public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; + public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table in Local Hive"; + public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE_IN_DEFAULT = "Create Intermediate Flat Hive Table in Default Hive"; + public static final String STEP_NAME_COPY_HIVE_DATA = "Copy Intermediate Table To Local Hadoop"; public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables"; public static final String STEP_NAME_COUNT_HIVE_TABLE = "Count Source Table"; public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns"; diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java index f3a4107..71c015a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java @@ -37,4 +37,6 @@ public interface IJoinedFlatTableDesc { long getSourceOffsetEnd(); TblColRef getDistributedBy(); + // Determine hive name by data model fact table. + String getHiveName(); } diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java index 0f98d5d..b4030dd 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java @@ -23,6 +23,7 @@ public interface ISourceAware { public static final int ID_HIVE = 0; public static final int ID_STREAMING = 1; public static final int ID_SPARKSQL = 5; + public static final int ID_EXTERNAL_HIVE = 6; public static final int ID_EXTERNAL = 7; int getSourceType(); diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java index abd348d..bb9987e 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java @@ -238,7 +238,7 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware { @Override public int getSourceType() { - return sourceType; + return hive == null ? ISourceAware.ID_HIVE : ISourceAware.ID_EXTERNAL_HIVE; } public void setSourceType(int sourceType) { diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java index 757888e..53906dc 100644 --- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java @@ -34,7 +34,7 @@ public class ITHiveTableReaderTest extends HBaseMetadataTestCase { @Test public void test() throws IOException { - HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact"); + HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact", null); int rowNumber = 0; while (reader.next()) { String[] row = reader.getRow(); diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java index c3b1e7c..f041bf5 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java +++ b/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java @@ -76,6 +76,7 @@ public class TableDescResponse extends TableDesc { this.setName(table.getName()); this.setSourceType(table.getSourceType()); this.setUuid(table.getUuid()); + this.setHive(table.getHive()); } } diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index e6db717..dcad40f 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -475,6 +475,10 @@ public class CubeService extends BasicService { logger.error("Cannot find table descirptor " + tableName, e); throw e; } + if(table.getHive() != null) { + logger.warn("Can not calculate cardinality for table {} which is loaded from external hive source !", tableName); + return; + } DefaultChainedExecutable job = new DefaultChainedExecutable(); job.setName("Hive Column Cardinality calculation for table '" + tableName + "'"); @@ -482,10 +486,6 @@ public class CubeService extends BasicService { String outPath = HiveColumnCardinalityJob.OUTPUT_PATH + "/" + tableName; String param = "-table " + tableName + " -output " + outPath; - if(table.getHive() != null) { - logger.warn("Can not calculate cardinality for tables which loaded from external hive source !"); - return; - } MapReduceExecutable step1 = new MapReduceExecutable(); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java index bcb9a38..39cfcad 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java @@ -74,7 +74,8 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { } private void createFlatHiveTable(KylinConfig config, int numReducers) throws IOException { - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + final String hiveName = getHiveName(); + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(hiveName); hiveCmdBuilder.addStatement(getInitStatement()); boolean useRedistribute = getUseRedistribute(); if (useRedistribute == true) { @@ -158,4 +159,12 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { public String getRowCountOutputDir() { return getParam("rowCountOutputDir"); } + + public void setHiveName(String hiveName) { + setParam("hiveName", hiveName); + } + + public String getHiveName() { + return getParam("hiveName"); + } } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java index 5a5b4e0..bde6e3b 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import org.apache.commons.io.IOUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.source.hive.external.HiveManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,10 +43,16 @@ public class HiveCmdBuilder { private HiveClientMode clientMode; private KylinConfig kylinConfig; final private ArrayList statements = Lists.newArrayList(); + protected String hiveName; public HiveCmdBuilder() { + this(null); + } + + public HiveCmdBuilder(String hive) { kylinConfig = KylinConfig.getInstanceFromEnv(); clientMode = HiveClientMode.valueOf(kylinConfig.getHiveClientMode().toUpperCase()); + this.hiveName = hive; } public String build() { @@ -53,13 +60,17 @@ public class HiveCmdBuilder { switch (clientMode) { case CLI: - buf.append("hive -e \""); + String hiveCommand = HiveManager.getInstance().getHiveCommand(this.hiveName); + buf.append(hiveCommand + " -e \""); for (String statement : statements) { buf.append(statement).append("\n"); } buf.append("\""); break; case BEELINE: + if(this.hiveName != null) { + throw new IllegalArgumentException("Can not use external hive with beeline mode to build cube currently."); + } BufferedWriter bw = null; try { File tmpHql = File.createTempFile("beeline_", ".hql"); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 520d7cc..2ddd381 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -26,6 +26,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; @@ -52,6 +53,7 @@ import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.LookupDesc; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.hive.external.ExternalHiveClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,13 +87,17 @@ public class HiveMRInput implements IMRInput { @Override public void configureJob(Job job) { - try { - HCatInputFormat.setInput(job, dbName, tableName); - job.setInputFormatClass(HCatInputFormat.class); - - job.setMapOutputValueClass(org.apache.hive.hcatalog.data.DefaultHCatRecord.class); - } catch (IOException e) { - throw new RuntimeException(e); + // HiveConf config file is static variable and ensure we set correct one. + synchronized(ExternalHiveClient.class) { + HiveConf.setHiveSiteLocation(Thread.currentThread().getContextClassLoader().getResource("hive-site.xml")); + try { + HCatInputFormat.setInput(job, dbName, tableName); + job.setInputFormatClass(HCatInputFormat.class); + + job.setMapOutputValueClass(org.apache.hive.hcatalog.data.DefaultHCatRecord.class); + } catch (IOException e) { + throw new RuntimeException(e); + } } } @@ -104,9 +110,9 @@ public class HiveMRInput implements IMRInput { public static class BatchCubingInputSide implements IMRBatchCubingInputSide { - final JobEngineConfig conf; - final IJoinedFlatTableDesc flatDesc; - String hiveViewIntermediateTables = ""; + protected final JobEngineConfig conf; + protected final IJoinedFlatTableDesc flatDesc; + protected String hiveViewIntermediateTables = ""; public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv()); @@ -126,7 +132,7 @@ public class HiveMRInput implements IMRInput { jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName)); } else if ("2".equals(createFlatTableMethod)) { // count from source table first, and then redistribute, suitable for partitioned table - final String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count"; + final String rowCountOutputDir = getRowCountOutputDir(conf, jobFlow.getId()); jobFlow.addTask(createCountHiveTableStep(conf, flatDesc, jobFlow.getId(), rowCountOutputDir)); jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, true, rowCountOutputDir)); } else { @@ -139,16 +145,25 @@ public class HiveMRInput implements IMRInput { } } - public static AbstractExecutable createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) { + protected String getRowCountOutputDir(JobEngineConfig conf, String jobId) { + return JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count"; + } + + protected boolean isWriteToLocalDir() { + return false; + } + + public AbstractExecutable createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) { StringBuilder hiveInitBuf = new StringBuilder(); hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n"); hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf)); - String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count"; + String rowCountOutputDir = getRowCountOutputDir(conf, jobId); RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep(); + step.setHiveName(flatTableDesc.getHiveName()); step.setInitStatement(hiveInitBuf.toString()); - step.setSelectRowCountStatement(JoinedFlatTable.generateSelectRowCountStatement(flatTableDesc, rowCountOutputDir)); + step.setSelectRowCountStatement(JoinedFlatTable.generateSelectRowCountStatement(flatTableDesc, rowCountOutputDir, isWriteToLocalDir())); step.setRowCountOutputDir(rowCountOutputDir); step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatTableDesc)); CubingExecutableUtil.setCubeName(cubeName, step.getParams()); @@ -157,13 +172,14 @@ public class HiveMRInput implements IMRInput { } - public static AbstractExecutable createCountHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String rowCountOutputDir) { + public AbstractExecutable createCountHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String rowCountOutputDir) { final ShellExecutable step = new ShellExecutable(); - - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + final String hiveName = flatTableDesc.getHiveName(); + + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(hiveName); hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf)); hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n"); - hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc, rowCountOutputDir)); + hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc, rowCountOutputDir, isWriteToLocalDir())); step.setCmd(hiveCmdBuilder.build()); step.setName(ExecutableConstants.STEP_NAME_COUNT_HIVE_TABLE); @@ -174,7 +190,7 @@ public class HiveMRInput implements IMRInput { public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) { ShellExecutable step = new ShellExecutable(); step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); - HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(flatDesc.getHiveName()); KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); MetadataManager metadataManager = MetadataManager.getInstance(kylinConfig); @@ -221,6 +237,7 @@ public class HiveMRInput implements IMRInput { String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf, redistribute); CreateFlatHiveTableStep step = new CreateFlatHiveTableStep(); + step.setHiveName(flatTableDesc.getHiveName()); step.setUseRedistribute(redistribute); step.setInitStatement(hiveInitBuf.toString()); step.setRowCountOutputDir(rowCountOutputDir); @@ -245,7 +262,7 @@ public class HiveMRInput implements IMRInput { return new HiveTableInputFormat(getIntermediateTableIdentity()); } - private String getIntermediateTableIdentity() { + protected String getIntermediateTableIdentity() { return conf.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatDesc.getTableName(); } } @@ -254,7 +271,8 @@ public class HiveMRInput implements IMRInput { private final BufferedLogger stepLogger = new BufferedLogger(logger); private void computeRowCount(CliCommandExecutor cmdExecutor) throws IOException { - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + final String hiveName = getHiveName(); + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(hiveName); hiveCmdBuilder.addStatement(getInitStatement()); hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n"); hiveCmdBuilder.addStatement(getSelectRowCountStatement()); @@ -282,7 +300,8 @@ public class HiveMRInput implements IMRInput { } private void redistributeTable(KylinConfig config, int numReducers) throws IOException { - final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); + final String hiveName = getHiveName(); + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(hiveName); hiveCmdBuilder.addStatement(getInitStatement()); hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n"); hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n"); @@ -369,6 +388,14 @@ public class HiveMRInput implements IMRInput { public String getRowCountOutputDir() { return getParam("rowCountOutputDir"); } + + public void setHiveName(String hiveName) { + setParam("hiveName", hiveName); + } + + public String getHiveName() { + return getParam("hiveName"); + } } public static class GarbageCollectionStep extends AbstractExecutable { @@ -390,7 +417,7 @@ public class HiveMRInput implements IMRInput { return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString()); } - private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException { + protected String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException { StringBuffer output = new StringBuffer(); final String hiveTable = this.getIntermediateTableIdentity(); if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) { @@ -441,7 +468,7 @@ public class HiveMRInput implements IMRInput { setParam("oldHiveTable", tableIdentity); } - private String getIntermediateTableIdentity() { + protected String getIntermediateTableIdentity() { return getParam("oldHiveTable"); } @@ -449,7 +476,7 @@ public class HiveMRInput implements IMRInput { setParam("externalDataPath", externalDataPath); } - private String getExternalDataPath() { + protected String getExternalDataPath() { return getParam("externalDataPath"); } @@ -457,9 +484,17 @@ public class HiveMRInput implements IMRInput { setParam("oldHiveViewIntermediateTables", tableIdentities); } - private String getHiveViewIntermediateTableIdentities() { + protected String getHiveViewIntermediateTableIdentities() { return getParam("oldHiveViewIntermediateTables"); } + + public String getCmd() { + StringBuffer buf = new StringBuffer(); + buf.append(" -").append("externalDataPath").append(" ").append(getIntermediateTableIdentity()).append(" -").append("externalDataPath").append(" "). + append(getExternalDataPath()).append(" -").append("oldHiveViewIntermediateTables").append(" ").append(getHiveViewIntermediateTableIdentities()); + + return buf.toString(); + } } } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java index dcc43ff..9959ca9 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java @@ -25,6 +25,7 @@ import org.apache.kylin.common.util.Pair; import org.apache.kylin.engine.mr.DFSFileTable; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.hive.external.HiveManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,17 +37,19 @@ public class HiveTable implements ReadableTable { final private String database; final private String hiveTable; + final private String hiveName; private HiveClient hiveClient; public HiveTable(TableDesc tableDesc) { + this.hiveName = tableDesc.getHive(); this.database = tableDesc.getDatabase(); this.hiveTable = tableDesc.getName(); } @Override public TableReader getReader() throws IOException { - return new HiveTableReader(database, hiveTable); + return new HiveTableReader(database, hiveTable, getHiveClient().getHiveConf()); } @Override @@ -86,7 +89,7 @@ public class HiveTable implements ReadableTable { public HiveClient getHiveClient() { if (hiveClient == null) { - hiveClient = new HiveClient(); + hiveClient = HiveManager.getInstance().createHiveClient(this.hiveName); } return hiveClient; } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java index 3f9ce01..f6db33a 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java @@ -48,6 +48,7 @@ public class HiveTableReader implements TableReader { private HCatRecord currentHCatRecord; private int numberOfSplits = 0; private Map partitionKV = null; + private HiveConf hiveConf = null; /** * Constructor for reading whole hive table @@ -55,8 +56,8 @@ public class HiveTableReader implements TableReader { * @param tableName * @throws IOException */ - public HiveTableReader(String dbName, String tableName) throws IOException { - this(dbName, tableName, null); + public HiveTableReader(String dbName, String tableName, HiveConf hiveConf) throws IOException { + this(dbName, tableName, null, hiveConf); } /** @@ -66,16 +67,17 @@ public class HiveTableReader implements TableReader { * @param partitionKV key-value pairs condition on the partition * @throws IOException */ - public HiveTableReader(String dbName, String tableName, Map partitionKV) throws IOException { + public HiveTableReader(String dbName, String tableName, Map partitionKV, HiveConf hiveConf) throws IOException { this.dbName = dbName; this.tableName = tableName; this.partitionKV = partitionKV; + this.hiveConf = hiveConf; initialize(); } private void initialize() throws IOException { try { - this.readCntxt = getHiveReaderContext(dbName, tableName, partitionKV); + this.readCntxt = getHiveReaderContext(dbName, tableName, partitionKV, this.hiveConf); } catch (Exception e) { e.printStackTrace(); throw new IOException(e); @@ -146,8 +148,10 @@ public class HiveTableReader implements TableReader { return "hive table reader for: " + dbName + "." + tableName; } - private static ReaderContext getHiveReaderContext(String database, String table, Map partitionKV) throws Exception { - HiveConf hiveConf = new HiveConf(HiveTableReader.class); + private static ReaderContext getHiveReaderContext(String database, String table, Map partitionKV, HiveConf hiveConf) + throws Exception { + if(hiveConf == null) + hiveConf = new HiveConf(HiveTableReader.class); Iterator> itr = hiveConf.iterator(); Map map = new HashMap(); while (itr.hasNext()) { diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HqlExecutable.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HqlExecutable.java index 79493a4..08fbcd6 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HqlExecutable.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HqlExecutable.java @@ -28,6 +28,7 @@ import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.source.hive.external.HiveManager; import org.datanucleus.store.types.backed.HashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +43,7 @@ public class HqlExecutable extends AbstractExecutable { private static final Logger logger = LoggerFactory.getLogger(HqlExecutable.class); private static final String HQL = "hql"; + private static final String HIVE_NAME = "hive-name"; private static final String HIVE_CONFIG = "hive-config"; public HqlExecutable() { @@ -52,7 +54,8 @@ public class HqlExecutable extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { try { Map configMap = getConfiguration(); - HiveClient hiveClient = new HiveClient(configMap); + String hiveName = getHiveName(); + HiveClient hiveClient = HiveManager.getInstance().createHiveClientWithConfig(configMap, hiveName); for (String hql : getHqls()) { hiveClient.executeHQL(hql); @@ -104,4 +107,11 @@ public class HqlExecutable extends AbstractExecutable { } } + public void setHiveName(String hiveName) { + setParam(HIVE_NAME, hiveName); + } + + private String getHiveName() { + return getParam(HIVE_NAME); + } } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/external/DistcpShellExecutable.java b/source-hive/src/main/java/org/apache/kylin/source/hive/external/DistcpShellExecutable.java new file mode 100644 index 0000000..cedd571 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/external/DistcpShellExecutable.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.hive.external; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.job.common.ShellExecutable; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DistcpShellExecutable extends ShellExecutable { + private static final Logger logger = LoggerFactory.getLogger(DistcpShellExecutable.class); + + private static final String HIVE_NAME = "hiveName"; + private static final String TABLE_NAME = "tableName"; + private static final String OUTPUT_PATH = "output"; + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + String tableName = getTableName(); + String hiveName = getHiveName(); + String input = null; + String database = context.getConfig().getHiveDatabaseForIntermediateTable(); + try { + if(hiveName == null) { + return new ExecuteResult(ExecuteResult.State.SUCCEED, "The job is using default hive, do not need copy data!"); + } + input = HiveManager.getInstance().getHiveTableLocation(database, tableName, hiveName); + } catch (Exception e) { + logger.error("Failed to get location of hive table {}.{}, using hive name {}", database, tableName, hiveName); + return new ExecuteResult(ExecuteResult.State.ERROR , e.getLocalizedMessage()); + } + String output = getOutputPath(); + logger.info("Copy Intermediate Hive Table input : {} , output : {}", input, output); + /** + * Copy hive table only when source hive table location is in different hadoop cluster. + */ + if(input.startsWith("/") || input.startsWith(HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY))) { + return new ExecuteResult(ExecuteResult.State.SUCCEED, "Hive " + hiveName + " is based on default hadoop cluster, skip copy data ."); + } else { + Path inputPath = new Path(input); + input = inputPath.toString(); + } + String cmd = String.format("hadoop distcp -overwrite %s %s", input, output); + super.setCmd(cmd); + + return super.doWork(context); + } + + public void setHiveName(String name) { + setParam(HIVE_NAME, name); + } + + public void setTableName(String name) { + setParam(TABLE_NAME, name); + } + + public void setOutputPath(String output) { + setParam(OUTPUT_PATH, output); + } + + public String getOutputPath() { + return getParam(OUTPUT_PATH); + } + + public String getHiveName() { + return getParam(HIVE_NAME); + } + + public String getTableName() { + return getParam(TABLE_NAME); + } + + public String getExecCmd() { + StringBuffer buf = new StringBuffer(); + buf.append(" -").append(HIVE_NAME).append(" ").append(getHiveName()).append(" -").append(TABLE_NAME).append(" "). + append(getTableName()).append(" -").append(OUTPUT_PATH).append(" ").append(getOutputPath()); + + return buf.toString(); + } +} diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java new file mode 100644 index 0000000..7f4706c --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.hive.external; + +import java.io.IOException; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.JoinedFlatTable; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.source.hive.CreateFlatHiveTableStep; +import org.apache.kylin.source.hive.HiveCmdBuilder; +import org.apache.kylin.source.hive.HiveMRInput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * External hive file mr input + * @author hzfengyu + */ +public class ExternalHiveMRInput extends HiveMRInput { + @Override + public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { + return new BatchFileCubingInputSide(flatDesc); + } + + public static class BatchFileCubingInputSide extends BatchCubingInputSide { + private static final Logger logger = LoggerFactory.getLogger(BatchFileCubingInputSide.class); + + public BatchFileCubingInputSide(IJoinedFlatTableDesc flatDesc) { + super(flatDesc); + } + + @Override + public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { + super.addStepPhase1_CreateFlatTable(jobFlow); + final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); + + /** + * Create table as flat hive table in default hive the same as external hive table, use it in next steps. + * Do not create view in default table. because kylin read lookup table file. + */ + jobFlow.addTask(createFlatTableInDefaultHive(conf, flatDesc, jobFlow.getId(), cubeName)); + AbstractExecutable copyDataStep = createCopyHiveDataStep(flatDesc.getTableName(), flatDesc.getHiveName(), + JoinedFlatTable.getTableDir(flatDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()))); + if(copyDataStep != null) { + jobFlow.addTask(copyDataStep); + } + } + + @Override + protected String getRowCountOutputDir(JobEngineConfig conf, String jobId) { + String tempDir = System.getProperty("java.io.tmpdir", "/tmp"); + return String.format("file://%s/kylin-%s/%s", tempDir, jobId, "/row_count"); + } + + @Override + protected boolean isWriteToLocalDir() { + return true; + } + + protected AbstractExecutable createCopyHiveDataStep(String flatHiveTableName, String hiveName, String output) { + DistcpShellExecutable copyHiveTableSetp = new DistcpShellExecutable(); + copyHiveTableSetp.setName(ExecutableConstants.STEP_NAME_COPY_HIVE_DATA); + copyHiveTableSetp.setHiveName(hiveName); + copyHiveTableSetp.setOutputPath(output); + copyHiveTableSetp.setTableName(flatHiveTableName); + + return copyHiveTableSetp; + } + + protected AbstractExecutable createFlatTableInDefaultHive(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) { + StringBuilder hiveInitBuf = new StringBuilder(); + hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf)); + + final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n"; + final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc); + final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId)); + + CreateFlatHiveTableStep step = new CreateFlatHiveTableStep(); + step.setHiveName(null); + step.setUseRedistribute(false); + step.setInitStatement(hiveInitBuf.toString()); + step.setRowCountOutputDir(null); + step.setCreateTableStatement(useDatabaseHql + dropTableHql + createTableHql); + CubingExecutableUtil.setCubeName(cubeName, step.getParams()); + step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE_IN_DEFAULT); + return step; + } + + public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) { + String hiveName = flatDesc.getHiveName(); + + ExternalGarbageCollectionStep step = new ExternalGarbageCollectionStep(); + step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION); + step.setIntermediateTableIdentity(getIntermediateTableIdentity()); + step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()))); + step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables); + step.setHiveName(hiveName); + jobFlow.addTask(step); + } + + public static class ExternalGarbageCollectionStep extends GarbageCollectionStep { + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + KylinConfig config = context.getConfig(); + StringBuffer output = new StringBuffer(); + try { + output.append(super.cleanUpIntermediateFlatTable(config)); + output.append(cleanUpExternalHiveFlatTable(config)); + // don't drop view to avoid concurrent issue + //output.append(cleanUpHiveViewIntermediateTable(config)); + } catch (IOException e) { + logger.error("job:" + getId() + " execute finished with exception", e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage()); + } + + return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString()); + } + + public String cleanUpExternalHiveFlatTable(KylinConfig config) throws IOException { + StringBuffer output = new StringBuffer(); + String hiveName = this.getHiveName(); + if(hiveName == null) + return output.toString(); + final String hiveTable = getIntermediateTableIdentity(); + if (StringUtils.isNotEmpty(hiveTable)) { + final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(hiveName); + hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";"); + hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";"); + + //remove table location first, otherwise table will not be found. + String rmExternalOutput = rmExternalTableDirOnHDFS(hiveTable, hiveName); + config.getCliCommandExecutor().execute(hiveCmdBuilder.build()); + output.append("Hive table " + hiveTable + " is dropped. \n"); + output.append(rmExternalOutput); + } + return output.toString(); + } + + private String rmExternalTableDirOnHDFS(String tableName, String hive) throws IOException { + try { + String dir = HiveManager.getInstance().getHiveTableLocation(tableName, hive); + Path path = new Path(dir); + FileSystem fs = HadoopUtil.getFileSystem(dir); + if(fs.exists(path)) { + fs.delete(path, true); + } + return "Remove External Hive " + hive + " Table " + tableName + ", location " + path.toString() + "\n"; + } catch (Exception e) { + logger.warn("Get table localtion failed ! table {}, hive name {}.", tableName, hive , e); + return "Fetch external table location or delete path failed. skip it."; + } + } + + public void setHiveName(String hiveName) { + setParam("hiveName", hiveName); + } + + public String getHiveName() { + return getParam("hiveName"); + } + + @Override + public String getCmd() { + StringBuffer sb = new StringBuffer(super.getCmd()); + sb.append(" -").append("hiveName").append(" ").append(this.getHiveName()); + return sb.toString(); + } + } + } + +} \ No newline at end of file diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveSource.java new file mode 100644 index 0000000..333b9e4 --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveSource.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.hive.external; + + import java.util.List; + +import org.apache.kylin.engine.mr.IMRInput; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.ISource; +import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.hive.HiveTable; + +import com.google.common.collect.Lists; + + public class ExternalHiveSource implements ISource { + + @Override + public I adaptToBuildEngine(Class engineInterface) { + if (engineInterface == IMRInput.class) { + return (I) new ExternalHiveMRInput(); + } else { + throw new RuntimeException("Cannot adapt to " + engineInterface); + } + } + + @Override + public ReadableTable createReadableTable(TableDesc tableDesc) { + return new HiveTable(tableDesc); + } + + @Override + public List getMRDependentResources(TableDesc table) { + return Lists.newArrayList(); + } + + } \ No newline at end of file -- 1.7.10.4