From 9c69b5f3915b99fd2c55e26d8db64a5c2150e229 Mon Sep 17 00:00:00 2001 From: "hzfengyu@corp.netease.com" Date: Thu, 3 Dec 2015 21:33:11 +0800 Subject: [PATCH 1/5] support more hives depend on different hadoop, add config decide whether copy intermediate table Signed-off-by: hzfengyu@corp.netease.com --- .../java/org/apache/kylin/common/KylinConfig.java | 34 +++- .../org/apache/kylin/common/util/HadoopUtil.java | 98 ++++++++-- .../org/apache/kylin/common/util/HiveClient.java | 35 +++- .../org/apache/kylin/common/util/HiveManager.java | 210 +++++++++++++++++++++ .../apache/kylin/common/util/HadoopUtilTest.java | 2 +- conf/kylin.properties | 30 ++- .../java/org/apache/kylin/cube/CubeInstance.java | 1 + .../java/org/apache/kylin/cube/CubeManager.java | 5 +- .../kylin/cube/cli/DictionaryGeneratorCLI.java | 5 +- .../org/apache/kylin/dict/DictionaryManager.java | 14 +- .../org/apache/kylin/dict/lookup/HiveTable.java | 15 +- .../org/apache/kylin/job/AbstractJobBuilder.java | 30 ++- .../java/org/apache/kylin/job/common/.gitignore | 1 + .../kylin/job/common/DistcpShellExecutable.java | 102 ++++++++++ .../org/apache/kylin/job/common/HqlExecutable.java | 16 +- .../kylin/job/constant/ExecutableConstants.java | 1 + .../apache/kylin/job/cube/CubingJobBuilder.java | 59 ++++-- .../apache/kylin/job/hadoop/AbstractHadoopJob.java | 23 ++- .../kylin/job/hadoop/cube/BaseCuboidMapper.java | 2 +- .../apache/kylin/job/hadoop/cube/CubeHFileJob.java | 6 +- .../apache/kylin/job/hadoop/cube/CuboidJob.java | 19 +- .../job/hadoop/cube/NewFactDistinctColumnsJob.java | 153 +++++++++++++++ .../hadoop/cube/NewFactDistinctColumnsMapper.java | 157 +++++++++++++++ .../dict/CreateInvertedIndexDictionaryJob.java | 3 + .../apache/kylin/job/hadoop/hbase/BulkLoadJob.java | 2 + .../kylin/job/hadoop/hbase/CreateHTableJob.java | 3 +- .../test/java/org/apache/kylin/job/DeployUtil.java | 4 +- .../kylin/metadata/project/ProjectInstance.java | 15 +- .../kylin/metadata/project/ProjectManager.java | 8 +- .../kylin/metadata/tool/HiveSourceTableLoader.java | 9 +- .../kylin/rest/controller/CubeController.java | 2 + .../kylin/rest/controller/ProjectController.java | 13 +- .../kylin/rest/controller/TableController.java | 2 + .../kylin/rest/request/CreateProjectRequest.java | 8 + .../org/apache/kylin/rest/service/CubeService.java | 2 + .../org/apache/kylin/rest/service/JobService.java | 4 + .../apache/kylin/rest/service/ProjectService.java | 6 +- 37 files changed, 1010 insertions(+), 89 deletions(-) create mode 100644 common/src/main/java/org/apache/kylin/common/util/HiveManager.java create mode 100644 job/src/main/java/org/apache/kylin/job/common/.gitignore create mode 100644 job/src/main/java/org/apache/kylin/job/common/DistcpShellExecutable.java create mode 100644 job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsJob.java create mode 100644 job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsMapper.java diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java index 5ddb090..25800d8 100644 --- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -78,9 +78,15 @@ public class KylinConfig { public static final String KYLIN_GET_JOB_STATUS_WITH_KERBEROS = "kylin.job.status.with.kerberos"; public static final String KYLIN_UPLOAD_DEPENDENCIES_JARS = "kylin.job.upload.jars.enabled"; - - public static final String KYLIN_HBASE_CLUSTER_FS = "kylin.hbase.cluster.fs"; - + + public static final String KYLIN_HBASE_CLUSTER_FS = "kylin.hbase.cluster.fs"; + + public static final String KYLIN_HIVE_ROOT_DIRECTORY = "kylin.hive.root.directory"; + + public static final String KYLIN_TRANSFORM_NAMESERVICE_ENABLE = "kylin.transform.nameservice.enable"; + + public static final String KYLIN_COPY_INTERMEDIATE_TABLE = "kylin.copy.intermediate.table.enable"; + /** * Toggle to indicate whether to use hive for table flattening. Default * true. @@ -284,11 +290,6 @@ public class KylinConfig { } return root + getMetadataUrlPrefix() + "/"; } - - private static String HADOOP_JOB_DEPENDENCIES_JARS_DIR = "kylin-job-dependencies"; - public String getHadoopDependencyJarsLocation() { - return getHdfsWorkingDirectory() + HADOOP_JOB_DEPENDENCIES_JARS_DIR; - } public String getKylinJobLogDir() { return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs"); @@ -392,8 +393,8 @@ public class KylinConfig { public boolean isUploadJarsEnabled() { return Boolean.valueOf(getOptional(KYLIN_UPLOAD_DEPENDENCIES_JARS, "false")); } - - public String getHBaseClusterFs() { + + public String getHBaseClusterFs() { return getOptional(KYLIN_HBASE_CLUSTER_FS, ""); } @@ -660,6 +661,19 @@ public class KylinConfig { public String getProperty(String key, String defaultValue) { return kylinConfig.getString(key, defaultValue); } + + public String getHiveRootDirectory() { + return kylinConfig.getString(KYLIN_HIVE_ROOT_DIRECTORY, null); + } + + public boolean isTransformNameService() { + return Boolean.parseBoolean(getOptional(KYLIN_TRANSFORM_NAMESERVICE_ENABLE, "true")); + } + + public boolean isCopyIntermediateTable() { + return Boolean.parseBoolean(getOptional(KYLIN_COPY_INTERMEDIATE_TABLE, "false")); + } + /** * Set a new key:value into the kylin config. diff --git a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java index dc19504..5b17e78 100644 --- a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java +++ b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java @@ -20,9 +20,11 @@ package org.apache.kylin.common.util; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -32,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hdfs.DFSUtil; import org.apache.kylin.common.KylinConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,11 +44,11 @@ public class HadoopUtil { private static ThreadLocal hadoopConfig = new ThreadLocal<>(); - private static ThreadLocal hbaseConfig = new ThreadLocal<>(); - + private static ThreadLocal hbaseConfig = new ThreadLocal<>(); + public static void setCurrentConfiguration(Configuration conf) { hadoopConfig.set(conf); - } + } public static void setCurrentHBaseConfiguration(Configuration conf) { hbaseConfig.set(conf); @@ -57,8 +60,8 @@ public class HadoopUtil { } return hadoopConfig.get(); } - - public static Configuration getCurrentHBaseConfiguration() { + + public static Configuration getCurrentHBaseConfiguration() { if (hbaseConfig.get() == null) { Configuration configuration = HBaseConfiguration.create(new Configuration()); String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs(); @@ -73,8 +76,8 @@ public class HadoopUtil { public static FileSystem getFileSystem(String path) throws IOException { return FileSystem.get(makeURI(path), getCurrentConfiguration()); } - - public static String makeQualifiedPathInHadoopCluster(String path) { + + public static String makeQualifiedPathInHadoopCluster(String path) { try { FileSystem fs = FileSystem.get(getCurrentConfiguration()); return fs.makeQualified(new Path(path)).toString(); @@ -82,7 +85,79 @@ public class HadoopUtil { throw new IllegalArgumentException("Cannot create FileSystem from current hadoop cluster conf", e); } } - + + public static Configuration convertCurrentConfig(String path) { + Configuration currentConfig = getCurrentConfiguration(); + if(path == null) + return currentConfig; + String nameService = currentConfig.get(FileSystem.FS_DEFAULT_NAME_KEY); + logger.debug("Current convert path " + path); + logger.debug("Current default name service " + nameService); + try { + URI pathUri = new URI(path); + String host = pathUri.getHost(); + //no not transform path with default name service. + if(nameService != null) { + URI defaultUri = new URI(nameService); + if(pathUri.getScheme().equalsIgnoreCase(defaultUri.getScheme()) && host.equalsIgnoreCase(defaultUri.getHost())) { + return currentConfig; + } + } + //get namespace to real name node map.. + Map> map = DFSUtil.getHaNnRpcAddresses(currentConfig); + Configuration tmpConfig = null; + Map addressesInNN = map.get(host); + //if do not exist this namespace, such as we use real name node + if(addressesInNN == null) + return currentConfig; + for(InetSocketAddress addr : addressesInNN.values()) { + String name = addr.getHostName(); + int port = addr.getPort(); + String target = String.format("hdfs://%s:%d/", name, port); + tmpConfig = new Configuration(currentConfig); + tmpConfig.set(FileSystem.FS_DEFAULT_NAME_KEY, target); + FileSystem tmpFs = FileSystem.get(tmpConfig); + try { + //try every real name node if it is standby server. + tmpFs.listFiles(new Path("/"), false); + logger.debug("Transorm path nameservice " + host + " to real name node " + target); + return tmpConfig; + } catch (Exception e) { + logger.warn(String.format("Hbase hadoop namenode %s, real host %s is standby server !", + nameService, target)); + continue; + } + } + } catch (IOException e) { + throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e); + } catch (URISyntaxException e1) { + throw new IllegalArgumentException("Cannot create path to URI", e1); + } + return currentConfig; + } + + public static String transformHdfsPath(String path) { + //without schema, using default config + if(path == null || !path.startsWith("hdfs://")) + return path; + //check config if transform path.. + if(!KylinConfig.getInstanceFromEnv().isTransformNameService()) + return path; + try { + URI uri = new URI(path); + String rawPath = uri.getRawPath(); + Configuration newConfig = convertCurrentConfig(path); + if(newConfig == null) { + newConfig = getCurrentConfiguration(); + } + FileSystem fs = FileSystem.get(newConfig); + return fs.makeQualified(new Path(rawPath)).toString(); + } catch (Exception e) { + logger.warn("Transform path " + path + " error !", e); + return null; + } + } + public static String makeQualifiedPathInHBaseCluster(String path) { try { FileSystem fs = FileSystem.get(getCurrentHBaseConfiguration()); @@ -90,7 +165,8 @@ public class HadoopUtil { } catch (IOException e) { throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e); } -} + } + public static URI makeURI(String filePath) { try { return new URI(filePath); @@ -152,8 +228,8 @@ public class HadoopUtil { conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "5"); conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000"); // conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true"); - - String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs(); + + String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs(); if (StringUtils.isNotEmpty(hbaseClusterFs)) { conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs); } diff --git a/common/src/main/java/org/apache/kylin/common/util/HiveClient.java b/common/src/main/java/org/apache/kylin/common/util/HiveClient.java index 7f7a616..662ca2c 100644 --- a/common/src/main/java/org/apache/kylin/common/util/HiveClient.java +++ b/common/src/main/java/org/apache/kylin/common/util/HiveClient.java @@ -19,6 +19,8 @@ package org.apache.kylin.common.util; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -46,12 +48,39 @@ public class HiveClient { protected Driver driver = null; protected HiveMetaStoreClient metaStoreClient = null; protected String type; - - public HiveClient() { + private final static String LOCAL_FS_SCHEMA = "file://"; + + //create hive client, if location is null using default config, or using specify config file + public HiveClient(String location) { + URL uri = null; + if(location != null) { + try { + uri = new URL(LOCAL_FS_SCHEMA + location); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Can not find hive config file " + location); + } + } + + //in HiveConf, hiveSiteURL is a static variable, so we should use a global lock. + synchronized(HiveClient.class) { + if(uri != null) { + hiveConf.setHiveSiteLocation(uri); + } + hiveConf = new HiveConf(HiveClient.class); + } + } + + //do not call this constructor any more + private HiveClient() { hiveConf = new HiveConf(HiveClient.class); } + + public HiveClient(Map configMap, String location) { + this(location); + appendConfiguration(configMap); + } - public HiveClient(Map configMap) { + private HiveClient(Map configMap) { this(); appendConfiguration(configMap); } diff --git a/common/src/main/java/org/apache/kylin/common/util/HiveManager.java b/common/src/main/java/org/apache/kylin/common/util/HiveManager.java new file mode 100644 index 0000000..bf38c16 --- /dev/null +++ b/common/src/main/java/org/apache/kylin/common/util/HiveManager.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// by hzfengyu +package org.apache.kylin.common.util; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.kylin.common.KylinConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HiveManager { + private static Map CACHE = new HashMap(); + private static final Logger logger = LoggerFactory.getLogger(HiveManager.class); + private static final String DEFAULT_HIVE_NAME = "default"; + private static final String HIVE_CONFIG_FILE_NAME = "hive-site.xml"; + private static final String DEFAULT_HIVE_COMMAND = "hive"; + + //thread local project name, specify the project current thread used + public static final ThreadLocal project = new ThreadLocal(); + + //hive root directory, if this equals to null or empty meaning just use default hive + private File hiveRootDir = null; + //never delete from this map, hive config do not change ! + private Map hiveClientMap = null; + private HiveClient defaultHiveClient = null; + + public static HiveManager getInstance() { + //using default kylin config + KylinConfig config = KylinConfig.getInstanceFromEnv(); + HiveManager r = CACHE.get(config); + if (r == null) { + r = new HiveManager(config); + CACHE.put(config, r); + if (CACHE.size() > 1) { + logger.warn("More than one singleton HiveManager exist"); + } + + } + return r; + } + + public String getCurrentProject() { + String projectName = project.get(); + if(projectName == null) { + logger.error("Can not find project parameter in current local thread" + getCurrentThreadStack()); + } + return projectName; + } + + private String getCurrentThreadStack() { + StringBuffer sb = new StringBuffer(); + StackTraceElement[] stackElements = Thread.currentThread().getStackTrace(); + if (stackElements != null) { + for (int i = 0; i < stackElements.length; i++) { + sb.append("\t").append("at ").append(stackElements[i].getClassName()).append(stackElements[i].getMethodName()). + append("(").append(stackElements[i].getFileName()).append(":"). + append(stackElements[i].getLineNumber()).append(")").append("\n"); + } + } + return sb.toString(); + } + + public void setCurrentProject(String projectName) { + project.set(projectName); + } + + private HiveManager(KylinConfig config) { + String rootDir = config.getHiveRootDirectory(); + if(rootDir != null && !rootDir.isEmpty()) { + File file = new File(rootDir); + //check to ensure hive root file exist + this.hiveRootDir = file; + if(!file.exists()) { + logger.warn("Hive root directory " + file.getAbsolutePath() + " do not exist !"); + this.hiveRootDir = null; + } + } else { + this.hiveRootDir = null; + } + this.hiveClientMap = new HashMap(); + } + + public List getAllHiveName() { + List hiveNames = new LinkedList(); + hiveNames.add(DEFAULT_HIVE_NAME); + if(this.hiveRootDir == null) + return hiveNames; + + //take every diectory in hive root dir is a hive source. take directory name as hive name + for(File file : this.hiveRootDir.listFiles()) { + if(!file.isDirectory()) { + logger.warn("File " + file.getAbsolutePath() + " in hive root directory is normal file."); + continue; + } + hiveNames.add(file.getName()); + } + return hiveNames; + } + + private String getHiveConfigFile(String hiveName) { + if(this.hiveRootDir == null || hiveName == null) + return null; + + File hiveRootFile = new File(this.hiveRootDir, hiveName); + if(!hiveRootFile.exists() || !hiveRootFile.isDirectory()) { + logger.warn("Hive " + hiveName + " root directory " + hiveRootFile.getAbsolutePath() + " do not exist."); + return null; + } + File hiveConfDir = new File(hiveRootFile, "conf"); + if(!hiveConfDir.exists() || !hiveConfDir.isDirectory() ) { + logger.warn("Hive " + hiveName + " config dirctory " + hiveConfDir.getAbsolutePath() + " do not exist."); + return null; + } + File hiveConfigFile = new File(hiveConfDir, HIVE_CONFIG_FILE_NAME); + if(!hiveConfigFile.exists() || !hiveConfigFile.isFile()) { + logger.warn("Hive " + hiveName + " config file " + hiveConfigFile.getAbsolutePath() + " do not exist."); + return null; + } + return hiveConfigFile.getAbsolutePath(); + } + + public HiveClient createHiveClient(String hiveName) throws IOException { + if(hiveName == null) { + if(defaultHiveClient == null) + defaultHiveClient = new HiveClient(null); + return this.defaultHiveClient; + } + if(this.hiveRootDir == null) { + throw new IOException("Do not support hive " + hiveName + " except default one"); + } + + HiveClient client = this.hiveClientMap.get(hiveName); + if(client != null) + return client; + String configFileLocation = getHiveConfigFile(hiveName); + if(configFileLocation == null) { + throw new IOException("Can not find hive " + hiveName + " config file in local hive root directory " + this.hiveRootDir.getAbsolutePath()); + } + try { + client = new HiveClient(configFileLocation); + this.hiveClientMap.put(hiveName, client); + return client; + } catch (Exception e) { + throw new IOException("Can not create hive client for " + hiveName + ", config file " + configFileLocation); + } + } + + public HiveClient createHiveClient(Map configMap, String hiveName) { + String configFileLocation = getHiveConfigFile(hiveName); + + return new HiveClient(configMap, configFileLocation); + } + + public String getHiveCommand(String hiveName) { + if(this.hiveRootDir == null || hiveName == null) + return null; + + File hiveRootFile = new File(this.hiveRootDir, hiveName); + if(!hiveRootFile.exists() || !hiveRootFile.isDirectory()) { + logger.warn("Hive " + hiveName + " root directory " + hiveRootFile.getAbsolutePath() + " do not exist."); + return null; + } + File hiveBinDir = new File(hiveRootFile, "bin"); + if(!hiveBinDir.exists() || !hiveBinDir.isDirectory() ) { + logger.warn("Hive " + hiveName + " bin dirctory " + hiveBinDir.getAbsolutePath() + " do not exist."); + return null; + } + File hiveCmdFile = new File(hiveBinDir, DEFAULT_HIVE_COMMAND); + if(!hiveCmdFile.exists() || !hiveCmdFile.isFile()) { + logger.warn("Hive " + hiveName + " bin file " + hiveCmdFile.getAbsolutePath() + " do not exist."); + return null; + } + return hiveCmdFile.getAbsolutePath(); + } + + public String getHiveTableLocation(String tableName, String hiveName) throws Exception { + String[] tables = HadoopUtil.parseHiveTableName(tableName); + + HiveClient hiveClient = createHiveClient(hiveName); + try { + String tableLocation = hiveClient.getHiveTableLocation(tables[0], tables[1]); + return tableLocation; + } catch (Exception e) { + logger.error("Get hive table " + tableName + " location error !"); + throw e; + } + } +} diff --git a/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java b/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java index 14fd0ee..e0ef20a 100644 --- a/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java +++ b/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java @@ -53,7 +53,7 @@ Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); assertEquals("hdfs://hbase-cluster/", conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); } -+ + @Test public void testMakeQualifiedPathInHBaseCluster() throws Exception { KylinConfig config = KylinConfig.getInstanceFromEnv(); diff --git a/conf/kylin.properties b/conf/kylin.properties index 5f4e37f..1b8ce5c 100644 --- a/conf/kylin.properties +++ b/conf/kylin.properties @@ -28,6 +28,20 @@ kylin.storage.url=hbase # Temp folder in hdfs, make sure user has the right access to the hdfs directory kylin.hdfs.working.dir=/kylin + +# HBase Cluster FileSystem, which serving hbase, format as hdfs://hbase-cluster/ +# leave empty if hbase running on same cluster with hive and mapreduce +kylin.hbase.cluster.fs= + +# all hive env root directory, the name of file is the name of this hive. +kylin.hive.root.directory= + +# transform nameservice to real namenode, if your hadoop cluster diff with hdfs that +# hbase/hive using, and your hadoop cluster config do not contain those hdfs. +kylin.transform.nameservice.enable= + +# config to whether copy intermediate table from hive hadoop to local hadoop. +kylin.copy.intermediate.table.enable=false kylin.job.mapreduce.default.reduce.input.mb=500 @@ -44,6 +58,12 @@ kylin.job.remote.cli.username= # Only necessary when kylin.job.run.as.remote.cmd=true kylin.job.remote.cli.password= +# if access RM server use kerberos authentication +kylin.job.status.with.kerberos= + +# if you deploy hadoop cluster without jars which kylin dependency, set it true +kylin.job.upload.jars.enabled= + # Used by test cases to prepare synthetic data for sample cube kylin.job.remote.cli.working.dir=/tmp/kylin @@ -115,13 +135,3 @@ ext.log.base.dir = /tmp/kylin_log1,/tmp/kylin_log2 #will create external hive table to query result csv file #will set to kylin_query_log by default if not config here query.log.parse.result.table = kylin_query_log - -#if you should getting job status from RM with kerberos, set it true.. -kylin.job.status.with.kerberos=false - -#if you deploy hadoop cluster without jars which kylin dependency, set it true -kylin.job.upload.jars.enabled=false - -# HBase Cluster FileSystem, which serving hbase, format as hdfs://hbase-cluster/ -# leave empty if hbase running on same cluster with hive and mapreduce -kylin.hbase.cluster.fs= \ No newline at end of file diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index e1d2c47..cf0e9f0 100644 --- a/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -89,6 +89,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization { @JsonProperty("retention_range") private long retentionRange = 0; + @JsonProperty("project_name") private String projectName; public List getBuildingSegments() { diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java index f92be9b..76d78bd 100644 --- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -36,6 +36,7 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.restclient.Broadcaster; import org.apache.kylin.common.restclient.CaseInsensitiveStringCache; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; @@ -199,8 +200,10 @@ public class CubeManager implements IRealizationProvider { public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException { MetadataManager metaMgr = getMetadataManager(); SnapshotManager snapshotMgr = getSnapshotManager(); + KylinConfig config = KylinConfig.getInstanceFromEnv(); - HiveTable hiveTable = new HiveTable(metaMgr, lookupTable); + String hiveName = ProjectManager.getInstance(config).getProject(HiveManager.getInstance().getCurrentProject()).getHiveName(); + HiveTable hiveTable = new HiveTable(metaMgr, lookupTable, HiveManager.getInstance().createHiveClient(hiveName)); TableDesc tableDesc = metaMgr.getTableDesc(lookupTable); SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc); diff --git a/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java index d5608b0..ead9988 100644 --- a/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java +++ b/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java @@ -21,6 +21,7 @@ package org.apache.kylin.cube.cli; import java.io.IOException; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -36,8 +37,10 @@ public class DictionaryGeneratorCLI { public static void processSegment(KylinConfig config, String cubeName, String segmentName, String factColumnsPath) throws IOException { CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); + //set thread local project name before use other hive + HiveManager.getInstance().setCurrentProject(cube.getProjectName()); CubeSegment segment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); - + processSegment(config, segment, factColumnsPath); } diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java index 61e8e74..cb0b465 100644 --- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java +++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.dict.lookup.FileTable; import org.apache.kylin.dict.lookup.HiveTable; import org.apache.kylin.dict.lookup.ReadableTable; @@ -45,6 +46,8 @@ import org.apache.kylin.dict.lookup.ReadableTable.TableSignature; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.metadata.project.ProjectInstance; +import org.apache.kylin.metadata.project.ProjectManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -262,7 +265,16 @@ public class DictionaryManager { if (model.isFactTable(col.getTable())) { table = new FileTable(factColumnsPath + "/" + col.getName(), -1); } else { - table = new HiveTable(metaMgr, col.getTable()); + KylinConfig config = KylinConfig.getInstanceFromEnv(); + String projectName = HiveManager.getInstance().getCurrentProject(); + String hiveName = null; + if(projectName != null) { + ProjectInstance projectInstance = ProjectManager.getInstance(config).getProject(projectName); + if(projectInstance != null) + hiveName = projectInstance.getHiveName(); + } + + table = new HiveTable(metaMgr, col.getTable(), HiveManager.getInstance().createHiveClient(hiveName)); } } // otherwise could refer to a data set, e.g. common_indicators.txt diff --git a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java index 0fbb3c3..0d303e6 100644 --- a/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java +++ b/dictionary/src/main/java/org/apache/kylin/dict/lookup/HiveTable.java @@ -22,9 +22,11 @@ import java.io.IOException; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HiveClient; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.common.util.Pair; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.project.ProjectManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,10 +41,15 @@ public class HiveTable implements ReadableTable { private HiveClient hiveClient; - public HiveTable(MetadataManager metaMgr, String table) { + public HiveTable(MetadataManager metaMgr, String table, HiveClient hiveClient) { TableDesc tableDesc = metaMgr.getTableDesc(table); this.database = tableDesc.getDatabase(); this.hiveTable = tableDesc.getName(); + this.hiveClient = hiveClient; + } + + public HiveTable(MetadataManager metaMgr, String table) { + this(metaMgr, table, null); } @Override @@ -94,9 +101,11 @@ public class HiveTable implements ReadableTable { return "hive: database=[" + database + "], table=[" + hiveTable + "]"; } - public HiveClient getHiveClient() { + public HiveClient getHiveClient() throws IOException { if (hiveClient == null) { - hiveClient = new HiveClient(); + //create hiveClient with current thread project name. + String hiveName = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(HiveManager.getInstance().getCurrentProject()).getHiveName(); + hiveClient = HiveManager.getInstance().createHiveClient(hiveName); } return hiveClient; } diff --git a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java index 4d258be..6624c57 100644 --- a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java @@ -20,13 +20,17 @@ package org.apache.kylin.job; import java.io.IOException; -import org.apache.hadoop.fs.FileSystem; -import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HiveManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.job.common.DistcpShellExecutable; +import org.apache.kylin.job.common.HadoopShellExecutable; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.hadoop.hive.IJoinedFlatTableDesc; +import org.apache.kylin.metadata.project.ProjectManager; public abstract class AbstractJobBuilder { @@ -73,9 +77,17 @@ public abstract class AbstractJobBuilder { throw new RuntimeException("Failed to generate insert data SQL for intermediate table."); } + String hiveName = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(HiveManager.getInstance(). + getCurrentProject()).getHiveName(); + String hiveCmd = "hive"; + if(hiveName != null) { + hiveCmd = HiveManager.getInstance().getHiveCommand(hiveName); + if(hiveCmd == null) + hiveCmd = "hive"; + } ShellExecutable step = new ShellExecutable(); StringBuffer buf = new StringBuffer(); - buf.append("hive "); + buf.append(hiveCmd + " "); buf.append(" -e \""); buf.append(useDatabaseHql + "\n"); buf.append(dropTableHql + "\n"); @@ -88,6 +100,18 @@ public abstract class AbstractJobBuilder { return step; } + + + protected ShellExecutable createCopyHiveTableStep(CubeSegment seg, String intermediateHiveTableName, String output) { + DistcpShellExecutable copyHiveTableSetp = new DistcpShellExecutable(); + copyHiveTableSetp.setName(ExecutableConstants.STEP_NAME_COPY_INTERMEDIATE_TABLE); + StringBuilder cmd = new StringBuilder(); + copyHiveTableSetp.setCubeName(seg.getCubeInstance().getName()); + copyHiveTableSetp.setOutputPath(output); + copyHiveTableSetp.setTableName(intermediateHiveTableName); + + return copyHiveTableSetp; + } protected String getJobWorkingDir(String uuid) { return engineConfig.getHdfsWorkingDirectory() + "/" + JOB_WORKING_DIR_PREFIX + uuid; diff --git a/job/src/main/java/org/apache/kylin/job/common/.gitignore b/job/src/main/java/org/apache/kylin/job/common/.gitignore new file mode 100644 index 0000000..0b55fe0 --- /dev/null +++ b/job/src/main/java/org/apache/kylin/job/common/.gitignore @@ -0,0 +1 @@ +/HqlExecutable.java diff --git a/job/src/main/java/org/apache/kylin/job/common/DistcpShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/DistcpShellExecutable.java new file mode 100644 index 0000000..029abd1 --- /dev/null +++ b/job/src/main/java/org/apache/kylin/job/common/DistcpShellExecutable.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.common; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.HiveManager; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.project.ProjectManager; + +public class DistcpShellExecutable extends ShellExecutable { + private static final String CUBE_NAME = "cubename"; + private static final String TABLE_NAME = "tablename"; + private static final String OUTPUT_PATH = "output"; + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + String cubeName = getCubeName().toUpperCase(); + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cubeInstance = cubeMgr.getCube(cubeName); + + String tableName = getTableName(); + String hiveName = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(cubeInstance.getProjectName()). + getHiveName(); + String input = null; + try { + input = HiveManager.getInstance().getHiveTableLocation(tableName, hiveName); + input = HadoopUtil.transformHdfsPath(input); + } catch (Exception e) { + logger.error("Can get location of hive table " + tableName + " using hive name " + hiveName); + return new ExecuteResult(ExecuteResult.State.ERROR , e.getLocalizedMessage()); + } + String output = getOutputPath(); + + logger.info("Copy Intermediate Hive Table input : " + input); + logger.info("Copy Intermediate Hive Table output : " + output); + //copy hive table only when hive is in different hadoop... + if(hiveName == null) { + String inputPath = HadoopUtil.makeQualifiedPathInHadoopCluster(input); + String outputPath = HadoopUtil.makeQualifiedPathInHadoopCluster(output); + if(inputPath.equals(outputPath)) { + return new ExecuteResult(ExecuteResult.State.SUCCEED, null); + } + } + String cmd = String.format("hadoop distcp %s %s", input, output); + setCmd(cmd); + + return super.doWork(context); + } + + public void setCubeName(String name) { + setParam(CUBE_NAME, name); + } + + public void setTableName(String name) { + setParam(TABLE_NAME, name); + } + + public void setOutputPath(String output) { + setParam(OUTPUT_PATH, output); + } + + public String getCubeName() { + return getParam(CUBE_NAME); + } + + public String getTableName() { + return getParam(TABLE_NAME); + } + + public String getOutputPath() { + return getParam(OUTPUT_PATH); + } + + public String getParameters() { + StringBuffer buf = new StringBuffer(); + buf.append(" -").append(CUBE_NAME).append(" ").append(getCubeName()).append(" -").append(TABLE_NAME).append(" "). + append(getTableName()).append(" -").append(OUTPUT_PATH).append(" ").append(getOutputPath()); + + return buf.toString(); + } +} diff --git a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java index 6147bd6..1e41919 100644 --- a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java +++ b/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.util.HiveClient; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; @@ -41,6 +42,7 @@ public class HqlExecutable extends AbstractExecutable { private static final String HQL = "hql"; private static final String HIVE_CONFIG = "hive-config"; + private static final String HIVE_ENV_NAME = "hive-name"; public HqlExecutable() { super(); @@ -50,7 +52,10 @@ public class HqlExecutable extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { try { Map configMap = getConfiguration(); - HiveClient hiveClient = new HiveClient(configMap); + String hiveName = getHiveName(); +// HiveClient hiveClient = new HiveClient(configMap); + HiveClient hiveClient = HiveManager.getInstance().createHiveClient(hiveName); + hiveClient.appendConfiguration(configMap); for (String hql : getHqls()) { hiveClient.executeHQL(hql); @@ -101,5 +106,12 @@ public class HqlExecutable extends AbstractExecutable { return Collections.emptyList(); } } - + + public void setHiveName(String hiveName) { + setParam(HIVE_ENV_NAME, hiveName); + } + + private String getHiveName() { + return this.getParam(HIVE_ENV_NAME); + } } diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index cf5b112..1d7ab4f 100644 --- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -43,6 +43,7 @@ public final class ExecutableConstants { public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary"; public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table"; + public static final String STEP_NAME_COPY_INTERMEDIATE_TABLE = "Copy Intermediate table to local Hadoop"; public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns"; public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data"; public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid Data"; diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java index dca4a10..d61d2f9 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.TimeZone; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileSystem; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; @@ -40,6 +40,7 @@ import org.apache.kylin.job.hadoop.cube.CubeHFileJob; import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob; import org.apache.kylin.job.hadoop.cube.MergeCuboidJob; import org.apache.kylin.job.hadoop.cube.NDCuboidJob; +import org.apache.kylin.job.hadoop.cube.NewFactDistinctColumnsJob; import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob; import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob; import org.apache.kylin.job.hadoop.hbase.BulkLoadJob; @@ -177,19 +178,27 @@ public final class CubingJobBuilder extends AbstractJobBuilder { final String jobId = result.getId(); final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg); final String intermediateHiveTableName = getIntermediateHiveTableName(intermediateTableDesc, jobId); - final String intermediateHiveTableLocation = getIntermediateHiveTableLocation(intermediateTableDesc, jobId); final String factDistinctColumnsPath = getFactDistinctColumnsPath(seg, jobId); final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount); + String database = engineConfig.getConfig().getHiveDatabaseForIntermediateTable(); final AbstractExecutable intermediateHiveTableStep = createIntermediateHiveTableStep(intermediateTableDesc, jobId); result.addTask(intermediateHiveTableStep); - result.addTask(createFactDistinctColumnsStep(seg, intermediateHiveTableName, jobId)); + String fullTableName = database + "." + intermediateHiveTableName; + String intermediateHiveTableLocation = null; + if(KylinConfig.getInstanceFromEnv().isCopyIntermediateTable()) { + intermediateHiveTableLocation = this.getIntermediateHiveTableLocation(intermediateTableDesc, jobId); + result.addTask(createCopyHiveTableStep(seg, fullTableName, intermediateHiveTableLocation)); + } + //create fast distinct column job and base cuboid job using intermediate table,transform to location in run +// result.addTask(createFactDistinctColumnsStep(seg, intermediateHiveTableName, jobId)); + result.addTask(createNewFactDistinctColumnsStep(seg, fullTableName, jobId, intermediateHiveTableLocation)); result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath)); // base cuboid step - final MapReduceExecutable baseCuboidStep = createBaseCuboidStep(seg, intermediateHiveTableLocation, cuboidOutputTempPath); + final MapReduceExecutable baseCuboidStep = createBaseCuboidStep(seg, fullTableName, cuboidOutputTempPath, intermediateHiveTableLocation); result.addTask(baseCuboidStep); // n dim cuboid steps @@ -205,9 +214,9 @@ public final class CubingJobBuilder extends AbstractJobBuilder { final String jobId = result.getId(); final String cuboidPath = cuboidRootPath + "*"; - result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath)); + result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath, jobId)); // create htable step - result.addTask(createCreateHTableStep(seg)); + result.addTask(createCreateHTableStep(seg, result.getId())); // generate hfiles step final MapReduceExecutable convertCuboidToHfileStep = createConvertCuboidToHfileStep(seg, cuboidPath, jobId); result.addTask(convertCuboidToHfileStep); @@ -264,8 +273,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/cuboid/*"; } - private String getRowkeyDistributionOutputPath(CubeSegment seg) { - return getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"; + private String getRowkeyDistributionOutputPath(CubeSegment seg, String uuid) { + return getJobWorkingDir(uuid) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"; } private String getFactDistinctColumnsPath(CubeSegment seg, String jobUuid) { @@ -290,6 +299,25 @@ public final class CubingJobBuilder extends AbstractJobBuilder { result.setMapReduceParams(cmd.toString()); return result; } + + private MapReduceExecutable createNewFactDistinctColumnsStep(CubeSegment seg, String intermediateHiveTableName, String jobId, + String intermediateHiveTableLocation) { + MapReduceExecutable result = new MapReduceExecutable(); + result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS); + result.setMapReduceJobClass(NewFactDistinctColumnsJob.class); + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd, seg); + appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); + appendExecCmdParameters(cmd, "segmentname", seg.getName()); + appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(seg, jobId)); + appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step"); + appendExecCmdParameters(cmd, "tablename", intermediateHiveTableName); + if(intermediateHiveTableLocation != null) + appendExecCmdParameters(cmd, "input", intermediateHiveTableLocation); + + result.setMapReduceParams(cmd.toString()); + return result; + } private HadoopShellExecutable createBuildDictionaryStep(CubeSegment seg, String factDistinctColumnsPath) { // base cuboid job @@ -305,7 +333,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return buildDictionaryStep; } - private MapReduceExecutable createBaseCuboidStep(CubeSegment seg, String intermediateHiveTableLocation, String[] cuboidOutputTempPath) { + private MapReduceExecutable createBaseCuboidStep(CubeSegment seg, String intermediateHiveTableName, String[] cuboidOutputTempPath, + String intermediateHiveTableLocation) { // base cuboid job MapReduceExecutable baseCuboidStep = new MapReduceExecutable(); @@ -316,7 +345,9 @@ public final class CubingJobBuilder extends AbstractJobBuilder { appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); appendExecCmdParameters(cmd, "segmentname", seg.getName()); - appendExecCmdParameters(cmd, "input", intermediateHiveTableLocation); + appendExecCmdParameters(cmd, "tablename", intermediateHiveTableName); + if(intermediateHiveTableLocation != null) + appendExecCmdParameters(cmd, "input", intermediateHiveTableLocation); appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]); appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName()); appendExecCmdParameters(cmd, "level", "0"); @@ -346,14 +377,14 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return ndCuboidStep; } - private MapReduceExecutable createRangeRowkeyDistributionStep(CubeSegment seg, String inputPath) { + private MapReduceExecutable createRangeRowkeyDistributionStep(CubeSegment seg, String inputPath, String uuid) { MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable(); rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION); StringBuilder cmd = new StringBuilder(); appendMapReduceParameters(cmd, seg); appendExecCmdParameters(cmd, "input", inputPath); - appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(seg)); + appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(seg, uuid)); appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step"); @@ -362,12 +393,12 @@ public final class CubingJobBuilder extends AbstractJobBuilder { return rowkeyDistributionStep; } - private HadoopShellExecutable createCreateHTableStep(CubeSegment seg) { + private HadoopShellExecutable createCreateHTableStep(CubeSegment seg, String uuid) { HadoopShellExecutable createHtableStep = new HadoopShellExecutable(); createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE); StringBuilder cmd = new StringBuilder(); appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName()); - appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg) + "/part-r-00000"); + appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg, uuid) + "/part-r-00000"); appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier()); createHtableStep.setJobParams(cmd.toString()); diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java index f013d32..9db3ed3 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java @@ -74,8 +74,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename"); protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname"); protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname"); - protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename"); - protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input"); + protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(false).withDescription("Hive table name.").create("tablename"); + protected static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(false).withDescription("Input path").create("input"); protected static final Option OPTION_INPUT_FORMAT = OptionBuilder.withArgName("inputformat").hasArg().isRequired(false).withDescription("Input format").create("inputformat"); protected static final Option OPTION_INPUT_DELIM = OptionBuilder.withArgName("inputdelim").hasArg().isRequired(false).withDescription("Input delimeter").create("inputdelim"); protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Output path").create("output"); @@ -145,18 +145,19 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { logger.info("Hadoop job classpath is: " + job.getConfiguration().get(MAP_REDUCE_CLASSPATH)); return ; } - String hdfsLibDir = KylinConfig.getInstanceFromEnv().getHadoopDependencyJarsLocation(); + String hdfsLibDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "kylin-job-dependencies"; String normalClasspath = null; String jarClasspath = null; try { FileSystem fs = FileSystem.get(new Configuration(jobConf)); Path path = new Path(hdfsLibDir); if(!fs.exists(path)) { - //upload all files inly if the directory is empty, if it is not contain all jars may get error too... - if(!fs.mkdirs(path)) { - logger.warn("Create directory for uploading hadoop dependency jars failed , location " + hdfsLibDir); - return ; - } + fs.mkdirs(path); + } + + FileStatus[] fList = fs.listStatus(path); + //upload all files inly if the directory is empty, if it is not contain all jars may get error too... + if(fList.length == 0) { long start = System.currentTimeMillis(); uploadJobClasspath(classpath, hdfsLibDir); logger.info("Upload all dependency files to HDFS " + hdfsLibDir + ", cost " + (System.currentTimeMillis() - start)); @@ -164,7 +165,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { StringBuffer jarClasspathBuffer = new StringBuffer(); StringBuffer normalClasspathBuffer = new StringBuffer(); - FileStatus[] fList = fs.listStatus(path); + fList = fs.listStatus(path); for(FileStatus file : fList) { Path p = file.getPath(); if(p.getName().endsWith(".jar")) { @@ -176,8 +177,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { jarClasspath = jarClasspathBuffer.length() > 1 ? jarClasspathBuffer.substring(0, jarClasspathBuffer.length() - 1) : null; normalClasspath = normalClasspathBuffer.length() > 1 ? normalClasspathBuffer.substring(0, normalClasspathBuffer.length() - 1) : null; } catch (IOException e) { - logger.error("Upload all kylin job dependency file to HDFS failed !", e); - return ; + logger.error("Upload all kylin job dependency file to HDFS failed !"); } if(jarClasspath != null) { @@ -485,5 +485,4 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { public Job getJob() { return this.job; } - } diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java index 5286fbc..08a87d6 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java @@ -226,8 +226,8 @@ public class BaseCuboidMapper extends KylinMapper BatchConstants.ERROR_RECORD_THRESHOLD) { diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java index 4154d31..578d237 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.job.constant.BatchConstants; @@ -55,8 +56,9 @@ public class CubeHFileJob extends AbstractHadoopJob { options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_HTABLE_NAME); parseOptions(options, args); - - Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + + String outputPath = HadoopUtil.transformHdfsPath(getOptionValue(OPTION_OUTPUT_PATH)); + Path output = new Path(outputPath); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java index 2010b25..a2ef01a 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CuboidJob.java @@ -32,6 +32,8 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.cuboid.CuboidCLI; @@ -39,6 +41,7 @@ import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.job.constant.BatchConstants; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.job.hadoop.AbstractHadoopJob; +import org.apache.kylin.metadata.project.ProjectManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,13 +64,13 @@ public class CuboidJob extends AbstractHadoopJob { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_CUBE_NAME); options.addOption(OPTION_SEGMENT_NAME); - options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_TABLE_NAME); options.addOption(OPTION_OUTPUT_PATH); options.addOption(OPTION_NCUBOID_LEVEL); options.addOption(OPTION_INPUT_FORMAT); + options.addOption(OPTION_INPUT_PATH); parseOptions(options, args); - Path input = new Path(getOptionValue(OPTION_INPUT_PATH)); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL)); @@ -76,7 +79,17 @@ public class CuboidJob extends AbstractHadoopJob { KylinConfig config = KylinConfig.getInstanceFromEnv(); CubeManager cubeMgr = CubeManager.getInstance(config); CubeInstance cube = cubeMgr.getCube(cubeName); - + + String inputPath = getOptionValue(OPTION_INPUT_PATH); + if(inputPath == null) { + String tableName = getOptionValue(OPTION_TABLE_NAME); + String hiveName = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(cube.getProjectName()). + getHiveName(); + inputPath = HiveManager.getInstance().getHiveTableLocation(tableName, hiveName); + inputPath = HadoopUtil.transformHdfsPath(inputPath); + } + Path input = new Path(inputPath); + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); logger.info("Starting: " + job.getJobName()); FileInputFormat.setInputPaths(job, input); diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsJob.java new file mode 100644 index 0000000..7f1d2ff --- /dev/null +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsJob.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// by hzfengyu +package org.apache.kylin.job.hadoop.cube; + +import java.io.IOException; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.HiveManager; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.job.constant.BatchConstants; +import org.apache.kylin.job.hadoop.AbstractHadoopJob; +import org.apache.kylin.metadata.project.ProjectManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NewFactDistinctColumnsJob extends AbstractHadoopJob { + protected static final Logger log = LoggerFactory.getLogger(NewFactDistinctColumnsJob.class); + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_TABLE_NAME); + options.addOption(OPTION_SEGMENT_NAME); + options.addOption(OPTION_INPUT_PATH); + + parseOptions(options, args); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + Configuration jobConf = job.getConfiguration(); + + String cubeName = getOptionValue(OPTION_CUBE_NAME); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + String segmentName = getOptionValue(OPTION_SEGMENT_NAME); + String inputPath = getOptionValue(OPTION_INPUT_PATH); + + // ---------------------------------------------------------------------------- + // add metadata to distributed cache + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cubeInstance = cubeMgr.getCube(cubeName); + + if(inputPath == null) { + String tableName = getOptionValue(OPTION_TABLE_NAME); + String hiveName = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(cubeInstance.getProjectName()). + getHiveName(); + inputPath = HiveManager.getInstance().getHiveTableLocation(tableName, hiveName); + inputPath = HadoopUtil.transformHdfsPath(inputPath); + } + Path input = new Path(inputPath); + + jobConf.set(BatchConstants.CFG_CUBE_NAME, cubeName); + + FileInputFormat.setInputPaths(job, input); + System.out.println("Starting: " + job.getJobName()); + + setJobClasspath(job); + + setupMapper(input); + setupReducer(output); + + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName); + // CubeSegment seg = cubeMgr.getCube(cubeName).getTheOnlySegment(); + attachKylinPropsAndMetadata(cubeInstance, job.getConfiguration()); + + return waitForCompletion(job); + } catch (Exception e) { + logger.error("error in NewFactDistinctColumnsJob", e); + printUsage(options); + throw e; + } finally { + if (job != null) { + cleanupTempConfFile(job.getConfiguration()); + } + } + } + + private void setupMapper(Path intermediateTableLocation) throws IOException { + // FileInputFormat.setInputPaths(job, input); + boolean isInputTextFormat = false; + if (hasOption(OPTION_INPUT_FORMAT) && ("textinputformat".equalsIgnoreCase(getOptionValue(OPTION_INPUT_FORMAT)))) { + isInputTextFormat = true; + } + + if (isInputTextFormat) { + job.setInputFormatClass(TextInputFormat.class); + } else { + job.setInputFormatClass(SequenceFileInputFormat.class); + } + job.setMapperClass(NewFactDistinctColumnsMapper.class); + job.setCombinerClass(FactDistinctColumnsCombiner.class); + job.setMapOutputKeyClass(ShortWritable.class); + job.setMapOutputValueClass(Text.class); + } + + private void setupReducer(Path output) throws IOException { + job.setReducerClass(FactDistinctColumnsReducer.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(Text.class); + + FileOutputFormat.setOutputPath(job, output); + job.getConfiguration().set(BatchConstants.OUTPUT_PATH, output.toString()); + + job.setNumReduceTasks(1); + + deletePath(job.getConfiguration(), output); + } + + public static void main(String[] args) throws Exception { + NewFactDistinctColumnsJob job = new NewFactDistinctColumnsJob(); + int exitCode = ToolRunner.run(job, args); + System.exit(exitCode); + } + +} diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsMapper.java new file mode 100644 index 0000000..41728f8 --- /dev/null +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/NewFactDistinctColumnsMapper.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// added by hzfengyu +package org.apache.kylin.job.hadoop.cube; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ShortWritable; +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.mr.KylinMapper; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesSplitter; +import org.apache.kylin.common.util.SplittedBytes; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.cuboid.Cuboid; +import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.RowKeyDesc; +import org.apache.kylin.dict.DictionaryManager; +import org.apache.kylin.job.constant.BatchConstants; +import org.apache.kylin.job.hadoop.AbstractHadoopJob; +import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NewFactDistinctColumnsMapper extends KylinMapper { + private static final Logger logger = LoggerFactory.getLogger(NewFactDistinctColumnsMapper.class); + + private String cubeName; + private String segmentName; + private String intermediateTableRowDelimiter; + private byte byteRowDelimiter; + private CubeInstance cube; + private CubeDesc cubeDesc; + private int[] factDictCols; + private CubeSegment cubeSegment; + private int counter; + + private ShortWritable outputKey = new ShortWritable(); + private Text outputValue = new Text(); + private int errorRecordCounter; + + private BytesSplitter bytesSplitter; + private CubeJoinedFlatTableDesc intermediateTableDesc; + + @Override + protected void setup(Context context) throws IOException { + super.publishConfiguration(context.getConfiguration()); + Configuration conf = context.getConfiguration(); + cubeName = conf.get(BatchConstants.CFG_CUBE_NAME).toUpperCase(); + segmentName = conf.get(BatchConstants.CFG_CUBE_SEGMENT_NAME); + intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER)); + if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) { + throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length); + } + + byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0]; + + KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(context.getConfiguration()); + + cube = CubeManager.getInstance(config).getCube(cubeName); + cubeDesc = cube.getDescriptor(); + cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); + + intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment); + bytesSplitter = new BytesSplitter(200, 4096); + + long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc); + Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId); + List columns = baseCuboid.getColumns(); + + logger.info("Basecuboid columns : " + columns); + ArrayList factDictCols = new ArrayList(); + RowKeyDesc rowkey = cubeDesc.getRowkey(); + DictionaryManager dictMgr = DictionaryManager.getInstance(config); + for (int i = 0; i < columns.size(); i++) { + TblColRef col = columns.get(i); + if (rowkey.isUseDictionary(col) == false) + continue; + + String scanTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0]; + if (cubeDesc.getModel().isFactTable(scanTable)) { + factDictCols.add(i); + } + } + logger.info("Fact dict columns : " + factDictCols); + this.factDictCols = new int[factDictCols.size()]; + for (int i = 0; i < factDictCols.size(); i++) + this.factDictCols[i] = factDictCols.get(i); + } + + @Override + public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException { + counter++; + if (counter % BatchConstants.COUNTER_MAX == 0) { + logger.info("Handled " + counter + " records!"); + } + + try { + bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter); + intermediateTableDesc.sanityCheck(bytesSplitter); + + SplittedBytes[] columnValues = bytesSplitter.getSplitBuffers(); + int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes(); + SplittedBytes fieldValue = null; + for (int i : factDictCols) { + outputKey.set((short) i); + fieldValue = columnValues[flatTableIndexes[i]]; + if (fieldValue == null) + continue; + outputValue.set(fieldValue.value, 0, fieldValue.length); + context.write(outputKey, outputValue); + } + } catch (Exception ex) { + handleErrorRecord(bytesSplitter, ex); + } + } + + private void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException { + + ex.printStackTrace(System.err); + System.err.println("Insane record: " + bytesSplitter); + + errorRecordCounter++; + if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) { + if (ex instanceof IOException) + throw (IOException) ex; + else if (ex instanceof RuntimeException) + throw (RuntimeException) ex; + else + throw new RuntimeException("", ex); + } + } +} diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java index f60313d..94943d4 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/dict/CreateInvertedIndexDictionaryJob.java @@ -21,6 +21,7 @@ package org.apache.kylin.job.hadoop.dict; import org.apache.commons.cli.Options; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; import org.apache.kylin.job.hadoop.AbstractHadoopJob; @@ -46,6 +47,8 @@ public class CreateInvertedIndexDictionaryJob extends AbstractHadoopJob { IIManager mgr = IIManager.getInstance(config); IIInstance ii = mgr.getII(iiname); + //set local thread project name + HiveManager.getInstance().setCurrentProject(ii.getProjectName()); mgr.buildInvertedIndexDictionary(ii.getFirstSegment(), factColumnsInputPath); return 0; diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java index 8c2faf9..0a2c2ba 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java @@ -61,6 +61,8 @@ public class BulkLoadJob extends AbstractHadoopJob { String input = getOptionValue(OPTION_INPUT_PATH); Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); + FileSystem fs = FileSystem.get(conf); + String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); KylinConfig config = KylinConfig.getInstanceFromEnv(); CubeManager cubeMgr = CubeManager.getInstance(config); diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java index 4a70d70..b8e57d4 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java @@ -82,6 +82,7 @@ public class CreateHTableJob extends AbstractHadoopJob { tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix()); Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); + Configuration hadoopConf = HadoopUtil.getCurrentConfiguration(); HBaseAdmin admin = new HBaseAdmin(conf); try { @@ -107,7 +108,7 @@ public class CreateHTableJob extends AbstractHadoopJob { tableDesc.addFamily(cf); } - byte[][] splitKeys = getSplits(conf, partitionFilePath); + byte[][] splitKeys = getSplits(hadoopConf, partitionFilePath); if (admin.tableExists(tableName)) { // admin.disableTable(tableName); diff --git a/job/src/test/java/org/apache/kylin/job/DeployUtil.java b/job/src/test/java/org/apache/kylin/job/DeployUtil.java index 41399c7..6f1ef3e 100644 --- a/job/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/job/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -33,6 +33,7 @@ import org.apache.kylin.common.persistence.ResourceTool; import org.apache.kylin.common.util.AbstractKylinTestCase; import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.HiveClient; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -215,7 +216,8 @@ public class DeployUtil { String tableFileDir = temp.getParent(); temp.delete(); - HiveClient hiveClient = new HiveClient(); +// HiveClient hiveClient = new HiveClient(); + HiveClient hiveClient = HiveManager.getInstance().createHiveClient(null); // create hive tables hiveClient.executeHQL("CREATE DATABASE IF NOT EXISTS EDW"); diff --git a/metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java index 71740ca..976a87c 100644 --- a/metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java +++ b/metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java @@ -70,6 +70,9 @@ public class ProjectInstance extends RootPersistentEntity { @JsonProperty("realizations") private List realizationEntries; + + @JsonProperty("hiveName") + private String hiveName; public String getResourcePath() { return concatResourcePath(name); @@ -86,13 +89,15 @@ public class ProjectInstance extends RootPersistentEntity { return project.toUpperCase(); } - public static ProjectInstance create(String name, String owner, String description, List realizationEntries) { + public static ProjectInstance create(String name, String owner, String description, String hiveName, List realizationEntries) { ProjectInstance projectInstance = new ProjectInstance(); projectInstance.updateRandomUuid(); projectInstance.setName(name); projectInstance.setOwner(owner); projectInstance.setDescription(description); + if(hiveName != null) + projectInstance.setHiveName(hiveName); projectInstance.setStatus(ProjectStatusEnum.ENABLED); projectInstance.setCreateTimeUTC(System.currentTimeMillis()); if (realizationEntries != null) @@ -233,6 +238,14 @@ public class ProjectInstance extends RootPersistentEntity { public void setRealizationEntries(List entries) { this.realizationEntries = entries; } + + public String getHiveName() { + return this.hiveName; + } + + public void setHiveName(String hiveName) { + this.hiveName = hiveName; + } public void init() { if (name == null) diff --git a/metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java index fc74731..920ad25 100644 --- a/metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java +++ b/metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java @@ -155,12 +155,12 @@ public class ProjectManager { return projectMap.get(projectName); } - public ProjectInstance createProject(String projectName, String owner, String description) throws IOException { + public ProjectInstance createProject(String projectName, String owner, String description, String hiveName) throws IOException { logger.info("Creating project '" + projectName); ProjectInstance currentProject = getProject(projectName); if (currentProject == null) { - currentProject = ProjectInstance.create(projectName, owner, description, null); + currentProject = ProjectInstance.create(projectName, owner, description, hiveName, null); } else { throw new IllegalStateException("The project named " + projectName + "already exists"); } @@ -193,7 +193,7 @@ public class ProjectManager { public ProjectInstance updateProject(ProjectInstance project, String newName, String newDesc) throws IOException { if (!project.getName().equals(newName)) { - ProjectInstance newProject = this.createProject(newName, project.getOwner(), newDesc); + ProjectInstance newProject = this.createProject(newName, project.getOwner(), newDesc, null); // FIXME table lost?? newProject.setCreateTimeUTC(project.getCreateTimeUTC()); newProject.recordUpdateTime(System.currentTimeMillis()); @@ -225,7 +225,7 @@ public class ProjectManager { String newProjectName = norm(project); ProjectInstance newProject = getProject(newProjectName); if (newProject == null) { - newProject = this.createProject(newProjectName, user, "This is a project automatically added when adding realization " + realizationName + "(" + type + ")"); + newProject = this.createProject(newProjectName, user, "This is a project automatically added when adding realization " + realizationName + "(" + type + ")", null); } newProject.addRealizationEntry(type, realizationName); saveResource(newProject); diff --git a/metadata/src/main/java/org/apache/kylin/metadata/tool/HiveSourceTableLoader.java b/metadata/src/main/java/org/apache/kylin/metadata/tool/HiveSourceTableLoader.java index 81b3830..3719d70 100644 --- a/metadata/src/main/java/org/apache/kylin/metadata/tool/HiveSourceTableLoader.java +++ b/metadata/src/main/java/org/apache/kylin/metadata/tool/HiveSourceTableLoader.java @@ -30,10 +30,12 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.HiveClient; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.MetadataManager; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.project.ProjectManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,12 +82,15 @@ public class HiveSourceTableLoader { } private static List extractHiveTables(String database, Set tables, KylinConfig config) throws IOException { - List loadedTables = Lists.newArrayList(); MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()); + String hiveName = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(HiveManager.getInstance().getCurrentProject()). + getHiveName(); + HiveClient hiveClient = HiveManager.getInstance().createHiveClient(hiveName); + for (String tableName : tables) { Table table = null; - HiveClient hiveClient = new HiveClient(); +// HiveClient hiveClient = new HiveClient(); List partitionFields = null; List fields = null; try { diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java index 8564a59..4887121 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -28,6 +28,7 @@ import java.util.UUID; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; @@ -194,6 +195,7 @@ public class CubeController extends BasicController { try { String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); CubeInstance cube = jobService.getCubeManager().getCube(cubeName); + HiveManager.getInstance().setCurrentProject(cube.getProjectName()); return jobService.submitJob(cube, jobBuildRequest.getStartTime(), jobBuildRequest.getEndTime(), // CubeBuildTypeEnum.valueOf(jobBuildRequest.getBuildType()), jobBuildRequest.isForceMergeEmptySegment(), submitter); } catch (JobException e) { diff --git a/server/src/main/java/org/apache/kylin/rest/controller/ProjectController.java b/server/src/main/java/org/apache/kylin/rest/controller/ProjectController.java index 5d5e558..69319a5 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/ProjectController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/ProjectController.java @@ -60,6 +60,12 @@ public class ProjectController extends BasicController { public List getProjects(@RequestParam(value = "limit", required = false) Integer limit, @RequestParam(value = "offset", required = false) Integer offset) { return projectService.listAllProjects(limit, offset); } + + @RequestMapping(value = "/hives", method = { RequestMethod.GET }) + @ResponseBody + public List getHives() { + return projectService.getAllHiveNames(); + } @RequestMapping(value = "", method = { RequestMethod.POST }) @ResponseBody @@ -67,6 +73,12 @@ public class ProjectController extends BasicController { if (StringUtils.isEmpty(projectRequest.getName())) { throw new InternalErrorException("A project name must be given to create a project"); } + List hiveNames = projectService.getAllHiveNames(); + String hiveName = projectRequest.getHiveName(); + if(hiveName != null && !hiveNames.contains(hiveName)) { + throw new InternalErrorException("A project must specify an valid hive name, available are " + + hiveNames + ", current " + hiveName); + } ProjectInstance createdProj = null; try { @@ -111,5 +123,4 @@ public class ProjectController extends BasicController { public void setProjectService(ProjectService projectService) { this.projectService = projectService; } - } diff --git a/server/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server/src/main/java/org/apache/kylin/rest/controller/TableController.java index 04aa159..822f29f 100644 --- a/server/src/main/java/org/apache/kylin/rest/controller/TableController.java +++ b/server/src/main/java/org/apache/kylin/rest/controller/TableController.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import org.apache.commons.lang.StringUtils; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; @@ -120,6 +121,7 @@ public class TableController extends BasicController { @ResponseBody public Map loadHiveTable(@PathVariable String tables, @PathVariable String project) throws IOException { String submitter = SecurityContextHolder.getContext().getAuthentication().getName(); + HiveManager.getInstance().setCurrentProject(project); String[] loaded = cubeMgmtService.reloadHiveTable(tables); cubeMgmtService.calculateCardinalityIfNotPresent(loaded, submitter); cubeMgmtService.syncTableToProject(loaded, project); diff --git a/server/src/main/java/org/apache/kylin/rest/request/CreateProjectRequest.java b/server/src/main/java/org/apache/kylin/rest/request/CreateProjectRequest.java index 7c4bfb9..97fa8a5 100644 --- a/server/src/main/java/org/apache/kylin/rest/request/CreateProjectRequest.java +++ b/server/src/main/java/org/apache/kylin/rest/request/CreateProjectRequest.java @@ -24,6 +24,7 @@ package org.apache.kylin.rest.request; public class CreateProjectRequest { private String name; private String description; + private String hiveName; public CreateProjectRequest() { } @@ -44,4 +45,11 @@ public class CreateProjectRequest { this.description = description; } + public String getHiveName() { + return this.hiveName; + } + + public void setHiveName(String hiveName) { + this.hiveName = hiveName; + } } diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java index c3e6694..1268fce 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.HTable; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.HBaseRegionSizeCalculator; import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -515,6 +516,7 @@ public class CubeService extends BasicService { public CubeInstance rebuildLookupSnapshot(String cubeName, String segmentName, String lookupTable) throws IOException { CubeManager cubeMgr = getCubeManager(); CubeInstance cube = cubeMgr.getCube(cubeName); + HiveManager.getInstance().setCurrentProject(cube.getProjectName()); CubeSegment seg = cube.getSegment(segmentName, SegmentStatusEnum.READY); cubeMgr.buildSnapshotTable(seg, lookupTable); diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java index ff61cc3..95884e4 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -31,6 +31,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.model.CubeBuildTypeEnum; import org.apache.kylin.job.JobInstance; +import org.apache.kylin.job.common.DistcpShellExecutable; import org.apache.kylin.job.common.HadoopShellExecutable; import org.apache.kylin.job.common.MapReduceExecutable; import org.apache.kylin.job.common.ShellExecutable; @@ -256,6 +257,9 @@ public class JobService extends BasicService { if (task instanceof ShellExecutable) { result.setExecCmd(((ShellExecutable) task).getCmd()); } + if(task instanceof DistcpShellExecutable) { + result.setExecCmd(((DistcpShellExecutable) task).getParameters()); + } if (task instanceof MapReduceExecutable) { result.setExecCmd(((MapReduceExecutable) task).getMapReduceParams()); result.setExecWaitTime(AbstractExecutable.getExtraInfoAsLong(stepOutput, MapReduceExecutable.MAP_REDUCE_WAIT_TIME, 0L) / 1000); diff --git a/server/src/main/java/org/apache/kylin/rest/service/ProjectService.java b/server/src/main/java/org/apache/kylin/rest/service/ProjectService.java index 581cabc..32ef343 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/ProjectService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/ProjectService.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import org.apache.kylin.common.util.HiveManager; import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.ProjectManager; import org.apache.kylin.rest.constant.Constant; @@ -57,7 +58,7 @@ public class ProjectService extends BasicService { throw new InternalErrorException("The project named " + projectName + " already exists"); } String owner = SecurityContextHolder.getContext().getAuthentication().getName(); - ProjectInstance createdProject = getProjectManager().createProject(projectName, owner, description); + ProjectInstance createdProject = getProjectManager().createProject(projectName, owner, description, projectRequest.getHiveName()); accessService.init(createdProject, AclPermission.ADMINISTRATION); logger.debug("New project created."); @@ -116,4 +117,7 @@ public class ProjectService extends BasicService { ProjectManager.clearCache(); } + public List getAllHiveNames() { + return HiveManager.getInstance().getAllHiveName(); + } } -- 1.9.4.msysgit.2