From bddcd845f5776df271d17ec0068847ffcba2362d Mon Sep 17 00:00:00 2001 From: sunyerui Date: Wed, 16 Sep 2015 00:03:29 +0800 Subject: [PATCH] KYLIN-957 Support HBase in a separate cluster --- conf/kylin.properties | 4 +++ .../java/org/apache/kylin/common/KylinConfig.java | 6 ++++ engine-mr/pom.xml | 5 ++++ .../org/apache/kylin/engine/mr/HadoopUtil.java | 32 +++++++++++++++++++++- .../kylin/storage/hbase/HBaseResourceStore.java | 4 +-- .../kylin/storage/hbase/steps/BulkLoadJob.java | 3 +- .../kylin/storage/hbase/steps/CreateHTableJob.java | 2 +- .../kylin/storage/hbase/steps/HBaseConnection.java | 8 ++++++ .../kylin/storage/hbase/steps/HBaseMRSteps.java | 5 ++-- .../storage/hbase/util/DeployCoprocessorCLI.java | 2 +- .../hbase/steps/ITHBaseResourceStoreTest.java | 2 +- 11 files changed, 64 insertions(+), 9 deletions(-) diff --git a/conf/kylin.properties b/conf/kylin.properties index 8dfb05b..5b56f31 100644 --- a/conf/kylin.properties +++ b/conf/kylin.properties @@ -12,6 +12,10 @@ kylin.storage.url=hbase # Temp folder in hdfs, make sure user has the right access to the hdfs directory kylin.hdfs.working.dir=/kylin +# HBase Cluster FileSystem, which serving hbase, format as hdfs://hbase-cluster:8020 +# leave empty if hbase running on same cluster with hive and mapreduce +kylin.hbase.cluster.fs= + kylin.job.mapreduce.default.reduce.input.mb=500 # If true, job engine will not assume that hadoop CLI reside on the same server as it self diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index db213f7..6b99ba4 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -104,6 +104,8 @@ public class KylinConfig implements Serializable { public static final String KYLIN_HDFS_WORKING_DIR = "kylin.hdfs.working.dir"; + public static final String KYLIN_HBASE_CLUSTER_FS = "kylin.hbase.cluster.fs"; + public static final String HIVE_DATABASE_FOR_INTERMEDIATE_TABLE = "kylin.job.hive.database.for.intermediatetable"; public static final String HIVE_PASSWORD = "hive.password"; @@ -293,6 +295,10 @@ public class KylinConfig implements Serializable { return root + getMetadataUrlPrefix() + "/"; } + public String getHBaseClusterFs() { + return getOptional(KYLIN_HBASE_CLUSTER_FS, ""); + } + public String getKylinJobLogDir() { return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs"); } diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml index e00a693..7a2bfe5 100644 --- a/engine-mr/pom.xml +++ b/engine-mr/pom.xml @@ -102,6 +102,11 @@ provided + org.apache.hbase + hbase-common + provided + + org.apache.mrunit mrunit hadoop2 diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java index 1c00993..7fcbf1f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/HadoopUtil.java @@ -24,16 +24,25 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.kylin.common.KylinConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HadoopUtil { + private static final Logger logger = LoggerFactory.getLogger(HadoopUtil.class); private static ThreadLocal hadoopConfig = new ThreadLocal<>(); + private static ThreadLocal hbaseConfig = new ThreadLocal<>(); + public static void setCurrentConfiguration(Configuration conf) { hadoopConfig.set(conf); } @@ -45,6 +54,18 @@ public class HadoopUtil { return hadoopConfig.get(); } + public static Configuration getCurrentHBaseConfiguration() { + if (hbaseConfig.get() == null) { + Configuration configuration = HBaseConfiguration.create(new Configuration()); + String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs(); + if (StringUtils.isNotEmpty(hbaseClusterFs)) { + configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs); + } + hbaseConfig.set(configuration); + } + return hbaseConfig.get(); + } + public static FileSystem getFileSystem(String path) throws IOException { return FileSystem.get(makeURI(path), getCurrentConfiguration()); } @@ -57,6 +78,15 @@ public class HadoopUtil { } } + public static String makeQualifiedPathInHBaseCluster(String path) { + try { + FileSystem fs = FileSystem.get(getCurrentHBaseConfiguration()); + return fs.makeQualified(new Path(path)).toString(); + } catch (IOException e) { + throw new IllegalArgumentException("Cannot create FileSystem from current hbase cluster conf", e); + } + } + public static String fixWindowsPath(String path) { // fix windows path if (path.startsWith("file://") && !path.startsWith("file:///") && path.contains(":\\")) { @@ -87,7 +117,7 @@ public class HadoopUtil { } public static void deletePath(Configuration conf, Path path) throws IOException { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = FileSystem.get(path.toUri(), conf); if (fs.exists(path)) { fs.delete(path, true); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 97ee64d..1cafd66 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -186,7 +186,7 @@ public class HBaseResourceStore extends ResourceStore { byte[] value = r.getValue(B_FAMILY, B_COLUMN); if (value.length == 0) { Path redirectPath = bigCellHDFSPath(resPath); - Configuration hconf = HadoopUtil.getCurrentConfiguration(); + Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); return fileSystem.open(redirectPath); @@ -305,7 +305,7 @@ public class HBaseResourceStore extends ResourceStore { private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException { Path redirectPath = bigCellHDFSPath(resPath); - Configuration hconf = HadoopUtil.getCurrentConfiguration(); + Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); if (fileSystem.exists(redirectPath)) { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java index 2be61f4..cbddfae 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/BulkLoadJob.java @@ -31,6 +31,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.HBaseColumnFamilyDesc; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +61,7 @@ public class BulkLoadJob extends AbstractHadoopJob { // end with "/" String input = getOptionValue(OPTION_INPUT_PATH); - Configuration conf = HBaseConfiguration.create(getConf()); + Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fs = FileSystem.get(conf); String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase(); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java index 35a35c1..eb1256c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java @@ -103,7 +103,7 @@ public class CreateHTableJob extends AbstractHadoopJob { CubeSegment cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW); String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase(); - Configuration conf = HBaseConfiguration.create(getConf()); + Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); try { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseConnection.java index abc4273..2f7cffb 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseConnection.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseConnection.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.StorageException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +82,12 @@ public class HBaseConnection { conf.set(HConstants.HBASE_CLIENT_PAUSE, "3000"); conf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "5"); conf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000"); + + String hbaseClusterFs = KylinConfig.getInstanceFromEnv().getHBaseClusterFs(); + if (StringUtils.isNotEmpty(hbaseClusterFs)) { + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, hbaseClusterFs); + } + // conf.set(ScannerCallable.LOG_SCANNER_ACTIVITY, "true"); if (StringUtils.isEmpty(url)) { return conf; diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java index 03b4361..dfb4f33 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java @@ -4,6 +4,7 @@ import java.util.List; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; @@ -129,11 +130,11 @@ public class HBaseMRSteps extends JobBuilderSupport { } public String getHFilePath(String jobId) { - return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/"; + return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/"); } public String getRowkeyDistributionOutputPath(String jobId) { - return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"; + return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats"); } } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index 5bbc3e1..31563ce 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -109,7 +109,7 @@ public class DeployCoprocessorCLI { private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - Configuration hconf = HadoopUtil.getCurrentConfiguration(); + Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar(); diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java index e1976cb..ba95176 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java @@ -80,7 +80,7 @@ public class ITHBaseResourceStoreTest extends HBaseMetadataTestCase { assertEquals(content, t); Path redirectPath = ((HBaseResourceStore) store).bigCellHDFSPath(path); - Configuration hconf = HadoopUtil.getCurrentConfiguration(); + Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); assertTrue(fileSystem.exists(redirectPath)); -- 2.3.2 (Apple Git-55)