From 790a031acee278bcaeb2f3947d65ffe7a5368ddc Mon Sep 17 00:00:00 2001 From: "hzfengyu@corp.netease.com" Date: Tue, 8 Dec 2015 19:48:44 +0800 Subject: [PATCH 3/8] git common package part patch, KYLIN-1172 Signed-off-by: hzfengyu@corp.netease.com --- .../java/org/apache/kylin/common/KylinConfig.java | 31 ++- .../org/apache/kylin/common/util/HadoopUtil.java | 119 +++++++++++- .../org/apache/kylin/common/util/HiveClient.java | 37 +++- .../org/apache/kylin/common/util/HiveManager.java | 210 +++++++++++++++++++++ .../apache/kylin/common/util/HadoopUtilTest.java | 65 +++++++ conf/kylin.properties | 26 ++- 6 files changed, 473 insertions(+), 15 deletions(-) create mode 100644 common/src/main/java/org/apache/kylin/common/util/HiveManager.java create mode 100644 common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java index d9c12a3..25800d8 100644 --- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -78,6 +78,15 @@ public class KylinConfig { public static final String KYLIN_GET_JOB_STATUS_WITH_KERBEROS = "kylin.job.status.with.kerberos"; public static final String KYLIN_UPLOAD_DEPENDENCIES_JARS = "kylin.job.upload.jars.enabled"; + + public static final String KYLIN_HBASE_CLUSTER_FS = "kylin.hbase.cluster.fs"; + + public static final String KYLIN_HIVE_ROOT_DIRECTORY = "kylin.hive.root.directory"; + + public static final String KYLIN_TRANSFORM_NAMESERVICE_ENABLE = "kylin.transform.nameservice.enable"; + + public static final String KYLIN_COPY_INTERMEDIATE_TABLE = "kylin.copy.intermediate.table.enable"; + /** * Toggle to indicate whether to use hive for table flattening. Default * true. @@ -281,11 +290,6 @@ public class KylinConfig { } return root + getMetadataUrlPrefix() + "/"; } - - private static String HADOOP_JOB_DEPENDENCIES_JARS_DIR = "kylin-job-dependencies"; - public String getHadoopDependencyJarsLocation() { - return getHdfsWorkingDirectory() + HADOOP_JOB_DEPENDENCIES_JARS_DIR; - } public String getKylinJobLogDir() { return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs"); @@ -390,6 +394,10 @@ public class KylinConfig { return Boolean.valueOf(getOptional(KYLIN_UPLOAD_DEPENDENCIES_JARS, "false")); } + public String getHBaseClusterFs() { + return getOptional(KYLIN_HBASE_CLUSTER_FS, ""); + } + public String getOverrideHiveTableLocation(String table) { return getOptional(HIVE_TABLE_LOCATION_PREFIX + table.toUpperCase()); } @@ -653,6 +661,19 @@ public class KylinConfig { public String getProperty(String key, String defaultValue) { return kylinConfig.getString(key, defaultValue); } + + public String getHiveRootDirectory() { + return kylinConfig.getString(KYLIN_HIVE_ROOT_DIRECTORY, null); + } + + public boolean isTransformNameService() { + return Boolean.parseBoolean(getOptional(KYLIN_TRANSFORM_NAMESERVICE_ENABLE, "true")); + } + + public boolean isCopyIntermediateTable() { + return Boolean.parseBoolean(getOptional(KYLIN_COPY_INTERMEDIATE_TABLE, "false")); + } + /** * Set a new key:value into the kylin config. diff --git a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java index f088a36..5b17e78 100644 --- a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java +++ b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java @@ -20,17 +20,22 @@ package org.apache.kylin.common.util; import java.io.IOException; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.kylin.common.KylinConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,9 +44,15 @@ public class HadoopUtil { private static ThreadLocal hadoopConfig = new ThreadLocal<>(); + private static ThreadLocal hbaseConfig = new ThreadLocal<>(); + public static void setCurrentConfiguration(Configuration conf) { hadoopConfig.set(conf); } + + public static void setCurrentHBaseConfiguration(Configuration conf) { + hbaseConfig.set(conf); + } public static Configuration getCurrentConfiguration() { if (hadoopConfig.get() == null) { @@ -49,10 +60,112 @@ public class HadoopUtil { } return hadoopConfig.get(); } + + public static Configuration getCurrentHBaseConfiguration() { + if (hbaseConfig.get() == null) { + Configuration configuration = HBaseConfiguration.create(new Configuration()); + String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs(); + if (StringUtils.isNotEmpty(hbaseClusterFs)) { + configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs); + } + hbaseConfig.set(configuration); + } + return hbaseConfig.get(); + } public static FileSystem getFileSystem(String path) throws IOException { return FileSystem.get(makeURI(path), getCurrentConfiguration()); } + + public static String makeQualifiedPathInHadoopCluster(String path) { + try { + FileSystem fs = FileSystem.get(getCurrentConfiguration()); + return fs.makeQualified(new Path(path)).toString(); + } catch (IOException e) { + throw new IllegalArgumentException("Cannot create FileSystem from current hadoop cluster conf", e); + } + } + + public static Configuration convertCurrentConfig(String path) { + Configuration currentConfig = getCurrentConfiguration(); + if(path == null) + return currentConfig; + String nameService = currentConfig.get(FileSystem.FS_DEFAULT_NAME_KEY); + logger.debug("Current convert path " + path); + logger.debug("Current default name service " + nameService); + try { + URI pathUri = new URI(path); + String host = pathUri.getHost(); + //no not transform path with default name service. + if(nameService != null) { + URI defaultUri = new URI(nameService); + if(pathUri.getScheme().equalsIgnoreCase(defaultUri.getScheme()) && host.equalsIgnoreCase(defaultUri.getHost())) { + return currentConfig; + } + } + //get namespace to real name node map.. + Map> map = DFSUtil.getHaNnRpcAddresses(currentConfig); + Configuration tmpConfig = null; + Map addressesInNN = map.get(host); + //if do not exist this namespace, such as we use real name node + if(addressesInNN == null) + return currentConfig; + for(InetSocketAddress addr : addressesInNN.values()) { + String name = addr.getHostName(); + int port = addr.getPort(); + String target = String.format("hdfs://%s:%d/", name, port); + tmpConfig = new Configuration(currentConfig); + tmpConfig.set(FileSystem.FS_DEFAULT_NAME_KEY, target); + FileSystem tmpFs = FileSystem.get(tmpConfig); + try { + //try every real name node if it is standby server. + tmpFs.listFiles(new Path("/"), false); + logger.debug("Transorm path nameservice " + host + " to real name node " + target); + return tmpConfig; + } catch (Exception e) { + logger.warn(String.format("Hbase hadoop namenode %s, real host %s is standby server !", + nameService, target)); + continue; + } + } + } catch (IOException e) { + throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e); + } catch (URISyntaxException e1) { + throw new IllegalArgumentException("Cannot create path to URI", e1); + } + return currentConfig; + } + + public static String transformHdfsPath(String path) { + //without schema, using default config + if(path == null || !path.startsWith("hdfs://")) + return path; + //check config if transform path.. + if(!KylinConfig.getInstanceFromEnv().isTransformNameService()) + return path; + try { + URI uri = new URI(path); + String rawPath = uri.getRawPath(); + Configuration newConfig = convertCurrentConfig(path); + if(newConfig == null) { + newConfig = getCurrentConfiguration(); + } + FileSystem fs = FileSystem.get(newConfig); + return fs.makeQualified(new Path(rawPath)).toString(); + } catch (Exception e) { + logger.warn("Transform path " + path + " error !", e); + return null; + } + } + + public static String makeQualifiedPathInHBaseCluster(String path) { + try { + FileSystem fs = FileSystem.get(getCurrentHBaseConfiguration()); + return fs.makeQualified(new Path(path)).toString(); + } catch (IOException e) { + throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e); + } + } public static URI makeURI(String filePath) { try { @@ -115,7 +228,11 @@ public class HadoopUtil { conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "5"); conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000"); // conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true"); - + + String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs(); + if (StringUtils.isNotEmpty(hbaseClusterFs)) { + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs); + } return conf; } diff --git a/common/src/main/java/org/apache/kylin/common/util/HiveClient.java b/common/src/main/java/org/apache/kylin/common/util/HiveClient.java index 7f7a616..9ad4606 100644 --- a/common/src/main/java/org/apache/kylin/common/util/HiveClient.java +++ b/common/src/main/java/org/apache/kylin/common/util/HiveClient.java @@ -19,6 +19,8 @@ package org.apache.kylin.common.util; import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -46,12 +48,41 @@ public class HiveClient { protected Driver driver = null; protected HiveMetaStoreClient metaStoreClient = null; protected String type; - - public HiveClient() { + private final static String LOCAL_FS_SCHEMA = "file://"; + + //create hive client, if location is null using default config, or using specify config file + public HiveClient(String location) { + URL uri = null; + if(location != null) { + try { + uri = new URL(LOCAL_FS_SCHEMA + location); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Can not find hive config file " + location); + } + } + + //in HiveConf, hiveSiteURL is a static variable, so we should use a global lock. + synchronized(HiveClient.class) { + if(uri != null) { + hiveConf.setHiveSiteLocation(uri); + } else { + hiveConf.setHiveSiteLocation(Thread.currentThread().getContextClassLoader().getResource("hive-site.xml")); + } + hiveConf = new HiveConf(HiveClient.class); + } + } + + //do not call this constructor any more + private HiveClient() { hiveConf = new HiveConf(HiveClient.class); } + + public HiveClient(Map configMap, String location) { + this(location); + appendConfiguration(configMap); + } - public HiveClient(Map configMap) { + private HiveClient(Map configMap) { this(); appendConfiguration(configMap); } diff --git a/common/src/main/java/org/apache/kylin/common/util/HiveManager.java b/common/src/main/java/org/apache/kylin/common/util/HiveManager.java new file mode 100644 index 0000000..d47e76a --- /dev/null +++ b/common/src/main/java/org/apache/kylin/common/util/HiveManager.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// by hzfengyu +package org.apache.kylin.common.util; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.kylin.common.KylinConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HiveManager { + private static Map CACHE = new HashMap(); + private static final Logger logger = LoggerFactory.getLogger(HiveManager.class); + private static final String DEFAULT_HIVE_NAME = "default"; + private static final String HIVE_CONFIG_FILE_NAME = "hive-site.xml"; + private static final String DEFAULT_HIVE_COMMAND = "hive"; + + //thread local project name, specify the project current thread used + public static final ThreadLocal project = new ThreadLocal(); + + //hive root directory, if this equals to null or empty meaning just use default hive + private File hiveRootDir = null; + //never delete from this map, hive config do not change ! + private Map hiveClientMap = null; + private HiveClient defaultHiveClient = null; + + public static HiveManager getInstance() { + //using default kylin config + KylinConfig config = KylinConfig.getInstanceFromEnv(); + HiveManager r = CACHE.get(config); + if (r == null) { + r = new HiveManager(config); + CACHE.put(config, r); + if (CACHE.size() > 1) { + logger.warn("More than one singleton HiveManager exist"); + } + + } + return r; + } + + public String getCurrentProject() { + String projectName = project.get(); + if(projectName == null) { + logger.error("Can not find project parameter in current local thread" + getCurrentThreadStack()); + } + return projectName; + } + + private String getCurrentThreadStack() { + StringBuffer sb = new StringBuffer(); + StackTraceElement[] stackElements = Thread.currentThread().getStackTrace(); + if (stackElements != null) { + for (int i = 0; i < stackElements.length; i++) { + sb.append("\t").append("at ").append(stackElements[i].getClassName()).append(stackElements[i].getMethodName()). + append("(").append(stackElements[i].getFileName()).append(":"). + append(stackElements[i].getLineNumber()).append(")").append("\n"); + } + } + return sb.toString(); + } + + public void setCurrentProject(String projectName) { + project.set(projectName); + } + + private HiveManager(KylinConfig config) { + String rootDir = config.getHiveRootDirectory(); + if(rootDir != null && !rootDir.isEmpty()) { + File file = new File(rootDir); + //check to ensure hive root file exist + this.hiveRootDir = file; + if(!file.exists()) { + logger.warn("Hive root directory " + file.getAbsolutePath() + " do not exist !"); + this.hiveRootDir = null; + } + } else { + this.hiveRootDir = null; + } + this.hiveClientMap = new HashMap(); + } + + public List getAllHiveName() { + List hiveNames = new LinkedList(); + hiveNames.add(DEFAULT_HIVE_NAME); + if(this.hiveRootDir == null) + return hiveNames; + + //take every diectory in hive root dir is a hive source. take directory name as hive name + for(File file : this.hiveRootDir.listFiles()) { + if(!file.isDirectory()) { + logger.warn("File " + file.getAbsolutePath() + " in hive root directory is normal file."); + continue; + } + hiveNames.add(file.getName()); + } + return hiveNames; + } + + private String getHiveConfigFile(String hiveName) { + if(this.hiveRootDir == null || hiveName == null) + return null; + + File hiveRootFile = new File(this.hiveRootDir, hiveName); + if(!hiveRootFile.exists() || !hiveRootFile.isDirectory()) { + logger.warn("Hive " + hiveName + " root directory " + hiveRootFile.getAbsolutePath() + " do not exist."); + return null; + } + File hiveConfDir = new File(hiveRootFile, "conf"); + if(!hiveConfDir.exists() || !hiveConfDir.isDirectory() ) { + logger.warn("Hive " + hiveName + " config dirctory " + hiveConfDir.getAbsolutePath() + " do not exist."); + return null; + } + File hiveConfigFile = new File(hiveConfDir, HIVE_CONFIG_FILE_NAME); + if(!hiveConfigFile.exists() || !hiveConfigFile.isFile()) { + logger.warn("Hive " + hiveName + " config file " + hiveConfigFile.getAbsolutePath() + " do not exist."); + return null; + } + return hiveConfigFile.getAbsolutePath(); + } + + public HiveClient createHiveClient(String hiveName) throws IOException { + if(hiveName == null) { + if(defaultHiveClient == null) + defaultHiveClient = new HiveClient(null); + return this.defaultHiveClient; + } + if(this.hiveRootDir == null) { + throw new IOException("Do not support hive " + hiveName + " except default one"); + } + + HiveClient client = this.hiveClientMap.get(hiveName); + if(client != null) + return client; + String configFileLocation = getHiveConfigFile(hiveName); + if(configFileLocation == null) { + throw new IOException("Can not find hive " + hiveName + " config file in local hive root directory " + this.hiveRootDir.getAbsolutePath()); + } + try { + client = new HiveClient(configFileLocation); + this.hiveClientMap.put(hiveName, client); + return client; + } catch (Exception e) { + throw new IOException("Can not create hive client for " + hiveName + ", config file " + configFileLocation); + } + } + + public HiveClient createHiveClient(Map configMap, String hiveName) { + String configFileLocation = getHiveConfigFile(hiveName); + + return new HiveClient(configMap, configFileLocation); + } + + public String getHiveCommand(String hiveName) { + if(this.hiveRootDir == null || hiveName == null) + return DEFAULT_HIVE_COMMAND; + + File hiveRootFile = new File(this.hiveRootDir, hiveName); + if(!hiveRootFile.exists() || !hiveRootFile.isDirectory()) { + logger.warn("Hive " + hiveName + " root directory " + hiveRootFile.getAbsolutePath() + " do not exist."); + return null; + } + File hiveBinDir = new File(hiveRootFile, "bin"); + if(!hiveBinDir.exists() || !hiveBinDir.isDirectory() ) { + logger.warn("Hive " + hiveName + " bin dirctory " + hiveBinDir.getAbsolutePath() + " do not exist."); + return null; + } + File hiveCmdFile = new File(hiveBinDir, DEFAULT_HIVE_COMMAND); + if(!hiveCmdFile.exists() || !hiveCmdFile.isFile()) { + logger.warn("Hive " + hiveName + " bin file " + hiveCmdFile.getAbsolutePath() + " do not exist."); + return null; + } + return hiveCmdFile.getAbsolutePath(); + } + + public String getHiveTableLocation(String tableName, String hiveName) throws Exception { + String[] tables = HadoopUtil.parseHiveTableName(tableName); + + HiveClient hiveClient = createHiveClient(hiveName); + try { + String tableLocation = hiveClient.getHiveTableLocation(tables[0], tables[1]); + return tableLocation; + } catch (Exception e) { + logger.error("Get hive table " + tableName + " location error !"); + throw e; + } + } +} diff --git a/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java b/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java new file mode 100644 index 0000000..e0ef20a --- /dev/null +++ b/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.kylin.common.util; + + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.FileSystem; + import org.apache.kylin.common.KylinConfig; + import org.junit.After; + import org.junit.BeforeClass; + import org.junit.Test; + + + import static org.junit.Assert.*; + + /** + * Created by sunyerui on 15/8/26. + * Tests for HadoopUtil + */ + public class HadoopUtilTest { + + @BeforeClass + public static void beforeClass() { + System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); + } + + @After + public void after() { + HadoopUtil.setCurrentConfiguration(null); + HadoopUtil.setCurrentHBaseConfiguration(null); + } + + @Test + public void testGetCurrentHBaseConfiguration() throws Exception { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + config.setProperty(KylinConfig.KYLIN_HBASE_CLUSTER_FS, "hdfs://hbase-cluster/"); + + Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); + assertEquals("hdfs://hbase-cluster/", conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); + } + + @Test + public void testMakeQualifiedPathInHBaseCluster() throws Exception { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + config.setProperty(KylinConfig.KYLIN_HBASE_CLUSTER_FS, "file:/"); + + String path = HadoopUtil.makeQualifiedPathInHBaseCluster("/path/to/test/hbase"); + assertEquals("file:/path/to/test/hbase", path); + } + } \ No newline at end of file diff --git a/conf/kylin.properties b/conf/kylin.properties index 3550597..1b8ce5c 100644 --- a/conf/kylin.properties +++ b/conf/kylin.properties @@ -28,6 +28,20 @@ kylin.storage.url=hbase # Temp folder in hdfs, make sure user has the right access to the hdfs directory kylin.hdfs.working.dir=/kylin + +# HBase Cluster FileSystem, which serving hbase, format as hdfs://hbase-cluster/ +# leave empty if hbase running on same cluster with hive and mapreduce +kylin.hbase.cluster.fs= + +# all hive env root directory, the name of file is the name of this hive. +kylin.hive.root.directory= + +# transform nameservice to real namenode, if your hadoop cluster diff with hdfs that +# hbase/hive using, and your hadoop cluster config do not contain those hdfs. +kylin.transform.nameservice.enable= + +# config to whether copy intermediate table from hive hadoop to local hadoop. +kylin.copy.intermediate.table.enable=false kylin.job.mapreduce.default.reduce.input.mb=500 @@ -44,6 +58,12 @@ kylin.job.remote.cli.username= # Only necessary when kylin.job.run.as.remote.cmd=true kylin.job.remote.cli.password= +# if access RM server use kerberos authentication +kylin.job.status.with.kerberos= + +# if you deploy hadoop cluster without jars which kylin dependency, set it true +kylin.job.upload.jars.enabled= + # Used by test cases to prepare synthetic data for sample cube kylin.job.remote.cli.working.dir=/tmp/kylin @@ -115,9 +135,3 @@ ext.log.base.dir = /tmp/kylin_log1,/tmp/kylin_log2 #will create external hive table to query result csv file #will set to kylin_query_log by default if not config here query.log.parse.result.table = kylin_query_log - -#if you should getting job status from RM with kerberos, set it true.. -kylin.job.status.with.kerberos=false - -#if you deploy hadoop cluster without jars which kylin dependency, set it true -kylin.job.upload.jars.enabled=false \ No newline at end of file -- 1.9.4.msysgit.2