From 895c0379da9e284fdfbd4cadca5bb83920226947 Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Mon, 7 Sep 2015 17:00:47 +0800 Subject: [PATCH] =?UTF-8?q?KYLIN-957=20remove=20=E2=80=9Ckylin.hadoop.clus?= =?UTF-8?q?ter.fs=E2=80=9D=20as=20Kylin=20should=20always=20run=20in=20the?= =?UTF-8?q?=20MR=20cluster,=20not=20need=20to=20configure=20that;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bin/check-env.sh | 11 ++------ bin/sample.sh | 5 ---- .../java/org/apache/kylin/common/KylinConfig.java | 6 ---- .../org/apache/kylin/common/util/HadoopUtil.java | 11 ++------ .../apache/kylin/common/util/HadoopUtilTest.java | 18 ------------ conf/kylin.properties | 4 --- .../org/apache/kylin/job/AbstractJobBuilder.java | 2 -- .../apache/kylin/job/cube/CubingJobBuilder.java | 8 ++---- .../kylin/job/cube/GarbageCollectionStep.java | 33 +++++++++------------- 9 files changed, 22 insertions(+), 76 deletions(-) diff --git a/bin/check-env.sh b/bin/check-env.sh index 56f2436..d3cd709 100644 --- a/bin/check-env.sh +++ b/bin/check-env.sh @@ -46,17 +46,10 @@ then fi WORKING_DIR=`sh $KYLIN_HOME/bin/get-properties.sh kylin.hdfs.working.dir` -HADOOP_FS=`sh $KYLIN_HOME/bin/get-properties.sh kylin.hadoop.cluster.fs` - -if [ "$HADOOP_FS" ] -then - hadoop fs -Dfs.defaultFS=$HADOOP_FS -mkdir -p $WORKING_DIR -else - hadoop fs -mkdir -p $WORKING_DIR -fi +hadoop fs -mkdir -p $WORKING_DIR if [ $? != 0 ] then - echo "failed to create $HADOOP_FS$WORKING_DIR, Please make sure the user has right to access $HADOOP_FS$WORKING_DIR" + echo "failed to create $WORKING_DIR, Please make sure the user has right to access $WORKING_DIR" exit 1 fi diff --git a/bin/sample.sh b/bin/sample.sh index 7c4999f..d53393c 100644 --- a/bin/sample.sh +++ b/bin/sample.sh @@ -20,14 +20,9 @@ dir=$(dirname ${0}) source ${dir}/check-env.sh job_jar=`find ${KYLIN_HOME}/lib/ -name kylin-job*.jar` -HADOOP_FS=`sh $KYLIN_HOME/bin/get-properties.sh kylin.hadoop.cluster.fs` echo "Going to create sample tables in hive..." cd ${KYLIN_HOME}/sample_cube/data -if [ -z $HADOOP_FS ];then hive -f ${KYLIN_HOME}/sample_cube/create_sample_tables.sql || { exit 1; } -else -hive -hiveconf fs.defaultFS=${HADOOP_FS} -f ${KYLIN_HOME}/sample_cube/create_sample_tables.sql || { exit 1; } -fi echo "Sample hive tables are created successfully; Going to create sample cube..." cd ${KYLIN_HOME} diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java index f1a8e92..d3220ee 100644 --- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -94,8 +94,6 @@ public class KylinConfig { public static final String KYLIN_HDFS_WORKING_DIR = "kylin.hdfs.working.dir"; - public static final String KYLIN_HADOOP_CLUSTER_FS = "kylin.hadoop.cluster.fs"; - public static final String KYLIN_HBASE_CLUSTER_FS = "kylin.hbase.cluster.fs"; public static final String HIVE_PASSWORD = "hive.password"; @@ -291,10 +289,6 @@ public class KylinConfig { return root + getMetadataUrlPrefix() + "/"; } - public String getHadoopClusterFs() { - return getOptional(KYLIN_HADOOP_CLUSTER_FS, ""); - } - public String getHBaseClusterFs() { return getOptional(KYLIN_HBASE_CLUSTER_FS, ""); } diff --git a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java index 43b2f29..b67b343 100644 --- a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java +++ b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java @@ -53,12 +53,7 @@ public class HadoopUtil { public static Configuration getCurrentConfiguration() { if (hadoopConfig.get() == null) { - Configuration configuration = new Configuration(); - String hadoopClusterFs = KylinConfig.getInstanceFromEnv().getHadoopClusterFs(); - if (hadoopClusterFs != null && !hadoopClusterFs.equals("")) { - configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, hadoopClusterFs); - } - hadoopConfig.set(configuration); + hadoopConfig.set(new Configuration()); } return hadoopConfig.get(); } @@ -67,7 +62,7 @@ public class HadoopUtil { if (hbaseConfig.get() == null) { Configuration configuration = HBaseConfiguration.create(new Configuration()); String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs(); - if (hbaseClusterFs != null && !hbaseClusterFs.equals("")) { + if (StringUtils.isNotEmpty(hbaseClusterFs)) { configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs); } hbaseConfig.set(configuration); @@ -160,7 +155,7 @@ public class HadoopUtil { // conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true"); String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs(); - if (hbaseClusterFs != null && !hbaseClusterFs.equals("")) { + if (StringUtils.isNotEmpty(hbaseClusterFs)) { conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs); } return conf; diff --git a/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java b/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java index c380933..8587683 100644 --- a/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java +++ b/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java @@ -46,15 +46,6 @@ public class HadoopUtilTest { } @Test - public void testGetCurrentConfiguration() throws Exception { - KylinConfig config = KylinConfig.getInstanceFromEnv(); - config.setProperty(KylinConfig.KYLIN_HADOOP_CLUSTER_FS, "hdfs://hadoop-cluster/"); - - Configuration conf = HadoopUtil.getCurrentConfiguration(); - assertEquals("hdfs://hadoop-cluster/", conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); - } - - @Test public void testGetCurrentHBaseConfiguration() throws Exception { KylinConfig config = KylinConfig.getInstanceFromEnv(); config.setProperty(KylinConfig.KYLIN_HBASE_CLUSTER_FS, "hdfs://hbase-cluster/"); @@ -64,15 +55,6 @@ public class HadoopUtilTest { } @Test - public void testMakeQualifiedPathInHadoopCluster() throws Exception { - KylinConfig config = KylinConfig.getInstanceFromEnv(); - config.setProperty(KylinConfig.KYLIN_HADOOP_CLUSTER_FS, "file:/"); - - String path = HadoopUtil.makeQualifiedPathInHadoopCluster("/path/to/test/hadoop"); - assertEquals("file:/path/to/test/hadoop", path); - } - - @Test public void testMakeQualifiedPathInHBaseCluster() throws Exception { KylinConfig config = KylinConfig.getInstanceFromEnv(); config.setProperty(KylinConfig.KYLIN_HBASE_CLUSTER_FS, "file:/"); diff --git a/conf/kylin.properties b/conf/kylin.properties index 0aa2898..84a1d46 100644 --- a/conf/kylin.properties +++ b/conf/kylin.properties @@ -29,10 +29,6 @@ kylin.storage.url=hbase # Temp folder in hdfs, make sure user has the right access to the hdfs directory kylin.hdfs.working.dir=/kylin -# Hadoop Cluster FileSystem, which serving hive and mapreduce, format as hdfs://hadoop-cluster:8020 -# leave empty if using default fs configured by local core-site.xml -kylin.hadoop.cluster.fs= - # HBase Cluster FileSystem, which serving hbase, format as hdfs://hbase-cluster:8020 # leave empty if hbase running on same cluster with hive and mapreduce kylin.hbase.cluster.fs= diff --git a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java index ffbfe98..87c4705 100644 --- a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java @@ -65,7 +65,6 @@ public abstract class AbstractJobBuilder { protected AbstractExecutable createIntermediateHiveTableStep(IJoinedFlatTableDesc intermediateTableDesc, String jobId) { final String useDatabaseHql = "USE " + engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + ";"; - final String setClusterHql = "-hiveconf " + FileSystem.FS_DEFAULT_NAME_KEY + "=\"" + HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY) + "\""; final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, jobId); final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, getJobWorkingDir(jobId), jobId); String insertDataHqls; @@ -79,7 +78,6 @@ public abstract class AbstractJobBuilder { ShellExecutable step = new ShellExecutable(); StringBuffer buf = new StringBuffer(); buf.append("hive "); - buf.append(setClusterHql); buf.append(" -e \""); buf.append(useDatabaseHql + "\n"); buf.append(dropTableHql + "\n"); diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java index dd71cd8..5c3c277 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java @@ -204,7 +204,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { AbstractExecutable addHTableSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) { final String jobId = result.getId(); - final String cuboidPath = HadoopUtil.makeQualifiedPathInHadoopCluster(cuboidRootPath + "*"); + final String cuboidPath = cuboidRootPath + "*"; result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath)); // create htable step @@ -243,8 +243,6 @@ public final class CubingJobBuilder extends AbstractJobBuilder { if (jobConf != null && jobConf.length() > 0) { builder.append(" -conf ").append(jobConf); } - String setCluster = " -D" + FileSystem.FS_DEFAULT_NAME_KEY + "=" + HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY); - builder.append(setCluster); } catch (IOException e) { throw new RuntimeException(e); } @@ -268,11 +266,11 @@ public final class CubingJobBuilder extends AbstractJobBuilder { } private String getRowkeyDistributionOutputPath(CubeSegment seg) { - return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"); + return getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"; } private String getFactDistinctColumnsPath(CubeSegment seg, String jobUuid) { - return HadoopUtil.makeQualifiedPathInHadoopCluster(getJobWorkingDir(jobUuid) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns"); + return getJobWorkingDir(jobUuid) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns"; } private String getHFilePath(CubeSegment seg, String jobId) { diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java index b4f6e8e..72cad96 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java +++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java @@ -84,8 +84,7 @@ public class GarbageCollectionStep extends AbstractExecutable { final String hiveTable = this.getOldHiveTable(); if (StringUtils.isNotEmpty(hiveTable)) { final String dropSQL = "USE " + KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() + ";" + " DROP TABLE IF EXISTS " + hiveTable + ";"; - final String setClusterHql = "-hiveconf " + FileSystem.FS_DEFAULT_NAME_KEY + "=\"" + HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY) + "\""; - final String dropHiveCMD = "hive " + setClusterHql + " -e \"" + dropSQL + "\""; + final String dropHiveCMD = "hive -e \"" + dropSQL + "\""; logger.info("executing: " + dropHiveCMD); ShellCmdOutput shellCmdOutput = new ShellCmdOutput(); context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput); @@ -132,32 +131,28 @@ public class GarbageCollectionStep extends AbstractExecutable { } } } - - private void dropFileSystemPath(FileSystem fs, Path p) throws IOException { - Path path = fs.makeQualified(p); - if (fs.exists(path)) { - fs.delete(path, true); - logger.debug("Dropped HDFS path: " + path); - output.append("Dropped HDFS path \"" + path + "\" \n"); - } else { - logger.debug("HDFS path not exists: " + path); - output.append("HDFS path not exists: \"" + path + "\" \n"); - } - } - + private void dropHdfsPath(ExecutableContext context) throws IOException { + List oldHdfsPaths = this.getOldHdsfPaths(); if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) { - FileSystem hadoopFs = FileSystem.get(HadoopUtil.getCurrentConfiguration()); - FileSystem hbaseFs = FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration()); + Configuration hconf = HadoopUtil.getCurrentConfiguration(); + FileSystem fileSystem = FileSystem.get(hconf); for (String path : oldHdfsPaths) { if (path.endsWith("*")) path = path.substring(0, path.length() - 1); Path oldPath = new Path(path); - dropFileSystemPath(hadoopFs, oldPath); - dropFileSystemPath(hbaseFs, oldPath); + if (fileSystem.exists(oldPath)) { + fileSystem.delete(oldPath, true); + logger.debug("Dropped HDFS path: " + path); + output.append("Dropped HDFS path \"" + path + "\" \n"); + } else { + logger.debug("HDFS path not exists: " + path); + output.append("HDFS path not exists: \"" + path + "\" \n"); + } } + } } -- 1.9.3 (Apple Git-50)