From 2988f89e5673f2c2c448cfb7fe19c6eb5ee0141f Mon Sep 17 00:00:00 2001 From: sunyerui Date: Thu, 21 Jul 2016 17:41:32 +0800 Subject: [PATCH] KYLIN-1910 Support Separate HBase Cluster with NN HA and Kerberos Authentication --- .../org/apache/kylin/common/KylinConfigBase.java | 4 +++ .../kylin/storage/hbase/HBaseConnection.java | 34 ++++++++++++++++++++++ .../kylin/storage/hbase/steps/CubeHFileJob.java | 2 ++ 3 files changed, 40 insertions(+) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 312ecbf..790d0f0 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -218,6 +218,10 @@ abstract public class KylinConfigBase implements Serializable { return getOptional("kylin.hbase.cluster.fs", ""); } + public String getHBaseClusterHDFSConfigFile() { + return getOptional("kylin.hbase.cluster.hdfs.config.file", ""); + } + public String getKylinJobLogDir() { return getOptional("kylin.job.log.dir", "/tmp/kylin/logs"); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java index d4dd3ae..e93c2bd 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java @@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.util.Collection; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.StorageException; import org.apache.kylin.engine.mr.HadoopUtil; @@ -140,6 +142,7 @@ public class HBaseConnection { private static Configuration newHBaseConfiguration(String url) { Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration()); + addHBaseClusterNNHAConfiguration(conf); // using a hbase:xxx URL is deprecated, instead hbase config is always loaded from hbase-site.xml in classpath if (!(StringUtils.isEmpty(url) || "hbase".equals(url))) @@ -168,6 +171,37 @@ public class HBaseConnection { return conf; } + // See YARN-3021. Copy here in case of missing in dependency MR client jars + public static final String JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE = "mapreduce.job.hdfs-servers.token-renewal.exclude"; + + public static void addHBaseClusterNNHAConfiguration(Configuration conf) { + String hdfsConfigFile = KylinConfig.getInstanceFromEnv().getHBaseClusterHDFSConfigFile(); + if (hdfsConfigFile == null || hdfsConfigFile.isEmpty()) { + return; + } + Configuration hdfsConf = new Configuration(false); + hdfsConf.addResource(hdfsConfigFile); + Collection nameServices = hdfsConf.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES); + Collection mainNameServices = conf.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES); + for (String serviceId : nameServices) { + mainNameServices.add(serviceId); + + String serviceConfKey = DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + serviceId; + String proxyConfKey = DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + serviceId; + conf.set(serviceConfKey, hdfsConf.get(serviceConfKey, "")); + conf.set(proxyConfKey, hdfsConf.get(proxyConfKey, "")); + + Collection nameNodes = hdfsConf.getTrimmedStringCollection(serviceConfKey); + for (String nameNode : nameNodes) { + String rpcConfKey = DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + serviceId + "." + nameNode; + conf.set(rpcConfKey, hdfsConf.get(rpcConfKey, "")); + } + } + conf.setStrings(DFSConfigKeys.DFS_NAMESERVICES, mainNameServices.toArray(new String[0])); + // See YARN-3021, instruct RM skip renew token of hbase cluster name services + conf.setStrings(JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE, nameServices.toArray(new String[0])); + } + public static String makeQualifiedPathInHBaseCluster(String path) { try { FileSystem fs = FileSystem.get(getCurrentHBaseConfiguration()); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java index e4a9c1e..9145cef 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java @@ -42,6 +42,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,6 +76,7 @@ public class CubeHFileJob extends AbstractHadoopJob { job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); setJobClasspath(job, cube.getConfig()); + HBaseConnection.addHBaseClusterNNHAConfiguration(job.getConfiguration()); addInputDirs(getOptionValue(OPTION_INPUT_PATH), job); FileOutputFormat.setOutputPath(job, output); -- 2.3.2 (Apple Git-55)