From fc30e5f1114fbc540830ec2c267b75295f442733 Mon Sep 17 00:00:00 2001 From: sunyerui Date: Wed, 26 Aug 2015 18:39:03 +0800 Subject: [PATCH 1/2] KYLIN-957 Support HBase in a separate cluster --- .../java/org/apache/kylin/common/KylinConfig.java | 12 ++++ .../common/persistence/HBaseResourceStore.java | 4 +- .../org/apache/kylin/common/util/HadoopUtil.java | 49 ++++++++++++- .../common/persistence/HBaseResourceStoreTest.java | 2 +- .../apache/kylin/common/util/HadoopUtilTest.java | 83 ++++++++++++++++++++++ conf/kylin.properties | 8 +++ .../org/apache/kylin/job/AbstractJobBuilder.java | 8 ++- .../apache/kylin/job/cube/CubingJobBuilder.java | 13 ++-- .../kylin/job/cube/GarbageCollectionStep.java | 36 ++++++---- .../apache/kylin/job/hadoop/AbstractHadoopJob.java | 2 +- .../apache/kylin/job/hadoop/hbase/BulkLoadJob.java | 3 +- .../kylin/job/hadoop/hbase/CreateHTableJob.java | 3 +- .../kylin/job/tools/DeployCoprocessorCLI.java | 2 +- 13 files changed, 198 insertions(+), 27 deletions(-) create mode 100644 common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java index deb2eda..76031c2 100644 --- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -94,6 +94,10 @@ public class KylinConfig { public static final String KYLIN_HDFS_WORKING_DIR = "kylin.hdfs.working.dir"; + public static final String KYLIN_HADOOP_CLUSTER_FS = "kylin.hadoop.cluster.fs"; + + public static final String KYLIN_HBASE_CLUSTER_FS = "kylin.hbase.cluster.fs"; + public static final String HIVE_PASSWORD = "hive.password"; public static final String HIVE_USER = "hive.user"; @@ -280,6 +284,14 @@ public class KylinConfig { return root + getMetadataUrlPrefix() + "/"; } + public String getHadoopClusterFs() { + return getOptional(KYLIN_HADOOP_CLUSTER_FS, ""); + } + + public String getHBaseClusterFs() { + return getOptional(KYLIN_HBASE_CLUSTER_FS, ""); + } + public String getKylinJobLogDir() { return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs"); } diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java index e2a4b12..37b8f8d 100644 --- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java +++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java @@ -179,7 +179,7 @@ public class HBaseResourceStore extends ResourceStore { byte[] value = r.getValue(B_FAMILY, B_COLUMN); if (value.length == 0) { Path redirectPath = bigCellHDFSPath(resPath); - Configuration hconf = HadoopUtil.getCurrentConfiguration(); + Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); return fileSystem.open(redirectPath); @@ -297,7 +297,7 @@ public class HBaseResourceStore extends ResourceStore { private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException { Path redirectPath = bigCellHDFSPath(resPath); - Configuration hconf = HadoopUtil.getCurrentConfiguration(); + Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); if (fileSystem.exists(redirectPath)) { diff --git a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java index fcefcf2..43b2f29 100644 --- a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java +++ b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java @@ -29,8 +29,10 @@ import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.kylin.common.KylinConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,17 +41,40 @@ public class HadoopUtil { private static ThreadLocal hadoopConfig = new ThreadLocal<>(); + private static ThreadLocal hbaseConfig = new ThreadLocal<>(); + public static void setCurrentConfiguration(Configuration conf) { hadoopConfig.set(conf); } + public static void setCurrentHBaseConfiguration(Configuration conf) { + hbaseConfig.set(conf); + } + public static Configuration getCurrentConfiguration() { if (hadoopConfig.get() == null) { - hadoopConfig.set(new Configuration()); + Configuration configuration = new Configuration(); + String hadoopClusterFs = KylinConfig.getInstanceFromEnv().getHadoopClusterFs(); + if (hadoopClusterFs != null && !hadoopClusterFs.equals("")) { + configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, hadoopClusterFs); + } + hadoopConfig.set(configuration); } return hadoopConfig.get(); } + public static Configuration getCurrentHBaseConfiguration() { + if (hbaseConfig.get() == null) { + Configuration configuration = HBaseConfiguration.create(new Configuration()); + String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs(); + if (hbaseClusterFs != null && !hbaseClusterFs.equals("")) { + configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs); + } + hbaseConfig.set(configuration); + } + return hbaseConfig.get(); + } + public static FileSystem getFileSystem(String path) throws IOException { return FileSystem.get(makeURI(path), getCurrentConfiguration()); } @@ -62,6 +87,24 @@ public class HadoopUtil { } } + public static String makeQualifiedPathInHadoopCluster(String path) { + try { + FileSystem fs = FileSystem.get(getCurrentConfiguration()); + return fs.makeQualified(new Path(path)).toString(); + } catch (IOException e) { + throw new IllegalArgumentException("Cannot create FileSystem from current hadoop cluster conf", e); + } + } + + public static String makeQualifiedPathInHBaseCluster(String path) { + try { + FileSystem fs = FileSystem.get(getCurrentHBaseConfiguration()); + return fs.makeQualified(new Path(path)).toString(); + } catch (IOException e) { + throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e); + } + } + /** * e.g. * 0. hbase (recommended way) @@ -116,6 +159,10 @@ public class HadoopUtil { conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000"); // conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true"); + String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs(); + if (hbaseClusterFs != null && !hbaseClusterFs.equals("")) { + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs); + } return conf; } diff --git a/common/src/test/java/org/apache/kylin/common/persistence/HBaseResourceStoreTest.java b/common/src/test/java/org/apache/kylin/common/persistence/HBaseResourceStoreTest.java index c9e8063..75625fb 100644 --- a/common/src/test/java/org/apache/kylin/common/persistence/HBaseResourceStoreTest.java +++ b/common/src/test/java/org/apache/kylin/common/persistence/HBaseResourceStoreTest.java @@ -77,7 +77,7 @@ public class HBaseResourceStoreTest extends HBaseMetadataTestCase { assertEquals(content, t); Path redirectPath = ((HBaseResourceStore) store).bigCellHDFSPath(path); - Configuration hconf = HadoopUtil.getCurrentConfiguration(); + Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); assertTrue(fileSystem.exists(redirectPath)); diff --git a/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java b/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java new file mode 100644 index 0000000..c380933 --- /dev/null +++ b/common/src/test/java/org/apache/kylin/common/util/HadoopUtilTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.kylin.common.KylinConfig; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; + + +import static org.junit.Assert.*; + +/** + * Created by sunyerui on 15/8/26. + * Tests for HadoopUtil + */ +public class HadoopUtilTest { + + @BeforeClass + public static void beforeClass() { + System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/sandbox"); + } + + @After + public void after() { + HadoopUtil.setCurrentConfiguration(null); + HadoopUtil.setCurrentHBaseConfiguration(null); + } + + @Test + public void testGetCurrentConfiguration() throws Exception { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + config.setProperty(KylinConfig.KYLIN_HADOOP_CLUSTER_FS, "hdfs://hadoop-cluster/"); + + Configuration conf = HadoopUtil.getCurrentConfiguration(); + assertEquals("hdfs://hadoop-cluster/", conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); + } + + @Test + public void testGetCurrentHBaseConfiguration() throws Exception { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + config.setProperty(KylinConfig.KYLIN_HBASE_CLUSTER_FS, "hdfs://hbase-cluster/"); + + Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); + assertEquals("hdfs://hbase-cluster/", conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); + } + + @Test + public void testMakeQualifiedPathInHadoopCluster() throws Exception { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + config.setProperty(KylinConfig.KYLIN_HADOOP_CLUSTER_FS, "file:/"); + + String path = HadoopUtil.makeQualifiedPathInHadoopCluster("/path/to/test/hadoop"); + assertEquals("file:/path/to/test/hadoop", path); + } + + @Test + public void testMakeQualifiedPathInHBaseCluster() throws Exception { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + config.setProperty(KylinConfig.KYLIN_HBASE_CLUSTER_FS, "file:/"); + + String path = HadoopUtil.makeQualifiedPathInHBaseCluster("/path/to/test/hbase"); + assertEquals("file:/path/to/test/hbase", path); + } +} diff --git a/conf/kylin.properties b/conf/kylin.properties index e61aa59..8b0f10c 100644 --- a/conf/kylin.properties +++ b/conf/kylin.properties @@ -29,6 +29,14 @@ kylin.storage.url=hbase # Temp folder in hdfs, make sure user has the right access to the hdfs directory kylin.hdfs.working.dir=/kylin +# Hadoop Cluster FileSystem, which serving hive and mapreduce, format as hdfs://hadoop-cluster/ +# leave empty if using default fs configured by local core-site.xml +kylin.hadoop.cluster.fs= + +# HBase Cluster FileSystem, which serving hbase, format as hdfs://hbase-cluster/ +# leave empty if hbase running on same cluster with hive and mapreduce +kylin.hbase.cluster.fs= + kylin.job.mapreduce.default.reduce.input.mb=500 kylin.server.mode=all diff --git a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java index e6fde23..96b87c5 100644 --- a/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/AbstractJobBuilder.java @@ -20,6 +20,9 @@ package org.apache.kylin.job; import java.io.IOException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; @@ -62,6 +65,7 @@ public abstract class AbstractJobBuilder { protected AbstractExecutable createIntermediateHiveTableStep(IJoinedFlatTableDesc intermediateTableDesc, String jobId) { final String useDatabaseHql = "USE " + engineConfig.getConfig().getHiveDatabaseForIntermediateTable() + ";"; + final String setClusterHql = "-hiveconf " + FileSystem.FS_DEFAULT_NAME_KEY + "=\"" + HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY) + "\""; final String dropTableHql = JoinedFlatTable.generateDropTableStatement(intermediateTableDesc, jobId); final String createTableHql = JoinedFlatTable.generateCreateTableStatement(intermediateTableDesc, getJobWorkingDir(jobId), jobId); String insertDataHqls; @@ -74,7 +78,9 @@ public abstract class AbstractJobBuilder { ShellExecutable step = new ShellExecutable(); StringBuffer buf = new StringBuffer(); - buf.append("hive -e \""); + buf.append("hive "); + buf.append(setClusterHql); + buf.append(" -e \""); buf.append(useDatabaseHql + "\n"); buf.append(dropTableHql + "\n"); buf.append(createTableHql + "\n"); diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java index 747ae3c..dd71cd8 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java +++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java @@ -25,6 +25,9 @@ import java.util.List; import java.util.TimeZone; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.job.AbstractJobBuilder; @@ -201,7 +204,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder { AbstractExecutable addHTableSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) { final String jobId = result.getId(); - final String cuboidPath = cuboidRootPath + "*"; + final String cuboidPath = HadoopUtil.makeQualifiedPathInHadoopCluster(cuboidRootPath + "*"); result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath)); // create htable step @@ -240,6 +243,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder { if (jobConf != null && jobConf.length() > 0) { builder.append(" -conf ").append(jobConf); } + String setCluster = " -D" + FileSystem.FS_DEFAULT_NAME_KEY + "=" + HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY); + builder.append(setCluster); } catch (IOException e) { throw new RuntimeException(e); } @@ -263,15 +268,15 @@ public final class CubingJobBuilder extends AbstractJobBuilder { } private String getRowkeyDistributionOutputPath(CubeSegment seg) { - return getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"; + return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"); } private String getFactDistinctColumnsPath(CubeSegment seg, String jobUuid) { - return getJobWorkingDir(jobUuid) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns"; + return HadoopUtil.makeQualifiedPathInHadoopCluster(getJobWorkingDir(jobUuid) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns"); } private String getHFilePath(CubeSegment seg, String jobId) { - return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/"; + return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/"); } private MapReduceExecutable createFactDistinctColumnsStep(CubeSegment seg, String intermediateHiveTableName, String jobId) { diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java index 4cb4a80..b4f6e8e 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java +++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java @@ -83,10 +83,14 @@ public class GarbageCollectionStep extends AbstractExecutable { private void dropHiveTable(ExecutableContext context) throws IOException { final String hiveTable = this.getOldHiveTable(); if (StringUtils.isNotEmpty(hiveTable)) { - final String dropSQL = "DROP TABLE IF EXISTS " + hiveTable + ";"; - final String dropHiveCMD = "hive -e \"" + dropSQL + "\""; + final String dropSQL = "USE " + KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() + ";" + " DROP TABLE IF EXISTS " + hiveTable + ";"; + final String setClusterHql = "-hiveconf " + FileSystem.FS_DEFAULT_NAME_KEY + "=\"" + HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY) + "\""; + final String dropHiveCMD = "hive " + setClusterHql + " -e \"" + dropSQL + "\""; + logger.info("executing: " + dropHiveCMD); ShellCmdOutput shellCmdOutput = new ShellCmdOutput(); context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput); + logger.debug("Dropped Hive table " + hiveTable + " \n"); + output.append(shellCmdOutput.getOutput() + " \n"); output.append("Dropped Hive table " + hiveTable + " \n"); } @@ -129,27 +133,31 @@ public class GarbageCollectionStep extends AbstractExecutable { } } - private void dropHdfsPath(ExecutableContext context) throws IOException { + private void dropFileSystemPath(FileSystem fs, Path p) throws IOException { + Path path = fs.makeQualified(p); + if (fs.exists(path)) { + fs.delete(path, true); + logger.debug("Dropped HDFS path: " + path); + output.append("Dropped HDFS path \"" + path + "\" \n"); + } else { + logger.debug("HDFS path not exists: " + path); + output.append("HDFS path not exists: \"" + path + "\" \n"); + } + } + private void dropHdfsPath(ExecutableContext context) throws IOException { List oldHdfsPaths = this.getOldHdsfPaths(); if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) { - Configuration hconf = HadoopUtil.getCurrentConfiguration(); - FileSystem fileSystem = FileSystem.get(hconf); + FileSystem hadoopFs = FileSystem.get(HadoopUtil.getCurrentConfiguration()); + FileSystem hbaseFs = FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration()); for (String path : oldHdfsPaths) { if (path.endsWith("*")) path = path.substring(0, path.length() - 1); Path oldPath = new Path(path); - if (fileSystem.exists(oldPath)) { - fileSystem.delete(oldPath, true); - logger.debug("Dropped HDFS path: " + path); - output.append("Dropped HDFS path \"" + path + "\" \n"); - } else { - logger.debug("HDFS path not exists: " + path); - output.append("HDFS path not exists: \"" + path + "\" \n"); - } + dropFileSystemPath(hadoopFs, oldPath); + dropFileSystemPath(hbaseFs, oldPath); } - } } diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java index 6ad89d6..a995649 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/AbstractHadoopJob.java @@ -295,7 +295,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { } protected void deletePath(Configuration conf, Path path) throws IOException { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = FileSystem.get(path.toUri(), conf); if (fs.exists(path)) { fs.delete(path, true); } diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java index 2608085..692d53e 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/BulkLoadJob.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeDesc; @@ -59,7 +60,7 @@ public class BulkLoadJob extends AbstractHadoopJob { // end with "/" String input = getOptionValue(OPTION_INPUT_PATH); - Configuration conf = HBaseConfiguration.create(getConf()); + Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fs = FileSystem.get(conf); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java index f114b5b..027c0ca 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java @@ -42,6 +42,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeDesc; @@ -79,7 +80,7 @@ public class CreateHTableJob extends AbstractHadoopJob { tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName()); tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix()); - Configuration conf = HBaseConfiguration.create(getConf()); + Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); HBaseAdmin admin = new HBaseAdmin(conf); try { diff --git a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java index a28477e..89472b2 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java +++ b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java @@ -106,7 +106,7 @@ public class DeployCoprocessorCLI { private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - Configuration hconf = HadoopUtil.getCurrentConfiguration(); + Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar(); -- 2.3.2 (Apple Git-55) From b490da28d642256090df8c7a3b5c8e5a2045379c Mon Sep 17 00:00:00 2001 From: sunyerui Date: Fri, 28 Aug 2015 18:00:58 +0800 Subject: [PATCH 2/2] KYLIN-957 Support HBase in a separate cluster, minor fix --- bin/check-env.sh | 5 +++-- bin/sample.sh | 6 ++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/bin/check-env.sh b/bin/check-env.sh index 1597c53..89a4aed 100644 --- a/bin/check-env.sh +++ b/bin/check-env.sh @@ -46,9 +46,10 @@ then fi WORKING_DIR=`sh $KYLIN_HOME/bin/get-properties.sh kylin.hdfs.working.dir` -hadoop fs -mkdir -p $WORKING_DIR +HADOOP_FS=`sh $KYLIN_HOME/bin/get-properties.sh kylin.hadoop.cluster.fs` +hadoop fs -mkdir -p $HADOOP_FS/$WORKING_DIR if [ $? != 0 ] then - echo "failed to create $WORKING_DIR, Please make sure the user has right to access $WORKING_DIR" + echo "failed to create $HADOOP_FS/$WORKING_DIR, Please make sure the user has right to access $HADOOP_FS/$WORKING_DIR" exit 1 fi diff --git a/bin/sample.sh b/bin/sample.sh index 4843529..7c4999f 100644 --- a/bin/sample.sh +++ b/bin/sample.sh @@ -20,9 +20,15 @@ dir=$(dirname ${0}) source ${dir}/check-env.sh job_jar=`find ${KYLIN_HOME}/lib/ -name kylin-job*.jar` +HADOOP_FS=`sh $KYLIN_HOME/bin/get-properties.sh kylin.hadoop.cluster.fs` echo "Going to create sample tables in hive..." cd ${KYLIN_HOME}/sample_cube/data +if [ -z $HADOOP_FS ];then hive -f ${KYLIN_HOME}/sample_cube/create_sample_tables.sql || { exit 1; } +else +hive -hiveconf fs.defaultFS=${HADOOP_FS} -f ${KYLIN_HOME}/sample_cube/create_sample_tables.sql || { exit 1; } +fi + echo "Sample hive tables are created successfully; Going to create sample cube..." cd ${KYLIN_HOME} hbase org.apache.hadoop.util.RunJar ${job_jar} org.apache.kylin.common.persistence.ResourceTool upload ${KYLIN_HOME}/sample_cube/metadata || { exit 1; } -- 2.3.2 (Apple Git-55)