From e7db57cf8cb2434d0c2e66b873acc32c42dbbe43 Mon Sep 17 00:00:00 2001 From: fengyu Date: Mon, 4 Jan 2016 12:10:32 +0800 Subject: [PATCH] transform path in other HDFS to real name node path Signed-off-by: fengyu --- .../org/apache/kylin/engine/mr/HadoopUtil.java | 76 ++++++++++++++++++++++ .../org/apache/kylin/engine/spark/SparkCubing.java | 2 + .../kylin/storage/hbase/steps/CubeHFileJob.java | 2 + 3 files changed, 80 insertions(+) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java index 9ce2bab..f6ef877 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java @@ -21,18 +21,25 @@ package org.apache.kylin.engine.mr; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.io.Writable; +import org.apache.kylin.common.KylinConfig; +import org.apache.log4j.Logger; public class HadoopUtil { private static final ThreadLocal hadoopConfig = new ThreadLocal<>(); + private static final Logger logger = Logger.getLogger(HadoopUtil.class); public static void setCurrentConfiguration(Configuration conf) { hadoopConfig.set(conf); @@ -115,5 +122,74 @@ public class HadoopUtil { throw new RuntimeException(e); } } + + public static Configuration convertCurrentConfig(String path) { + Configuration currentConfig = getCurrentConfiguration(); + if(path == null) + return currentConfig; + String nameService = currentConfig.get(FileSystem.FS_DEFAULT_NAME_KEY); + logger.debug("Current convert path " + path); + logger.debug("Current default name service " + nameService); + try { + URI pathUri = new URI(path); + String host = pathUri.getHost(); + //do not transform path with default name service. + if(nameService != null) { + URI defaultUri = new URI(nameService); + if(pathUri.getScheme().equalsIgnoreCase(defaultUri.getScheme()) && host.equalsIgnoreCase(defaultUri.getHost())) { + return currentConfig; + } + } + //get namespace to real name node map.. + Map> map = DFSUtil.getHaNnRpcAddresses(currentConfig); + Map addressesInNN = map.get(host); + //if do not exist this namespace, such as we use real name node + if(addressesInNN == null) + return currentConfig; + for(InetSocketAddress addr : addressesInNN.values()) { + String name = addr.getHostName(); + int port = addr.getPort(); + String target = String.format("%s://%s:%d/", HdfsConstants.HDFS_URI_SCHEME, name, port); + Configuration tmpConfig = new Configuration(); + tmpConfig.set(FileSystem.FS_DEFAULT_NAME_KEY, target); + FileSystem tmpFs = FileSystem.get(tmpConfig); + try { + //try every real name node if it is standby server. + tmpFs.listFiles(new Path("/"), false); + logger.debug("Transform path nameservice " + host + " to real name node " + target); + return tmpConfig; + } catch (Exception e) { + logger.warn(String.format("Hbase hadoop namenode %s, real host %s is standby server !", + nameService, target)); + continue; + } + } + } catch (IOException e) { + throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e); + } catch (URISyntaxException e1) { + throw new IllegalArgumentException("Cannot create path to URI", e1); + } + return currentConfig; + } + //transform a path without default name service(such as path in hbase hdfs, hfile location) to path point to master name node. + public static Path transformPathToNN(Path path) { + //without schema, using default config + if(path == null || !path.toString().startsWith(HdfsConstants.HDFS_URI_SCHEME)) + return path; + + try { + URI uri = new URI(path.toString()); + String rawPath = uri.getRawPath(); + Configuration newConfig = convertCurrentConfig(path.toString()); + if(newConfig == null) { + newConfig = getCurrentConfiguration(); + } + FileSystem fs = FileSystem.get(newConfig); + return fs.makeQualified(new Path(rawPath)); + } catch (Exception e) { + logger.warn("Transform path " + path + " error !", e); + return null; + } + } } diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java index efd3173..f537fea 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java @@ -78,6 +78,7 @@ import org.apache.kylin.cube.util.CubingUtils; import org.apache.kylin.dict.Dictionary; import org.apache.kylin.dict.DictionaryGenerator; import org.apache.kylin.dict.IterableDictionaryValueEnumerator; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.common.CubeStatsReader; import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter; import org.apache.kylin.engine.spark.cube.DefaultTupleConverter; @@ -390,6 +391,7 @@ public class SparkCubing extends AbstractApplication { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); Configuration conf = getConfigurationForHFile(cubeSegment.getStorageLocationIdentifier()); Path path = new Path(kylinConfig.getHdfsWorkingDirectory(), "hfile_" + UUID.randomUUID().toString()); + path = HadoopUtil.transformPathToNN(path); Preconditions.checkArgument(!FileSystem.get(conf).exists(path)); String url = conf.get("fs.defaultFS") + path.toString(); System.out.println("use " + url + " as hfile"); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java index 1f0b1a0..e62d179 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java @@ -33,6 +33,7 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; import org.slf4j.Logger; @@ -57,6 +58,7 @@ public class CubeHFileJob extends AbstractHadoopJob { parseOptions(options, args); Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + output = HadoopUtil.transformPathToNN(output); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); -- 1.9.4.msysgit.2