From 311e6a972f70af13e6e56fad9f8d5462325d4010 Mon Sep 17 00:00:00 2001 From: murkrishn Date: Thu, 22 Oct 2015 03:01:00 -0700 Subject: [PATCH] Code level changes for accommodating upgrade of HBase Client API. Reference jira - https://issues.apache.org/jira/browse/KYLIN-782 --- .../kylin/common/persistence/HBaseConnection.java | 22 ++++---- .../common/persistence/HBaseResourceStore.java | 31 ++++++------ .../common/util/HBaseRegionSizeCalculator.java | 27 ++++++---- .../apache/kylin/common/util/BasicHadoopTest.java | 17 +++++-- .../kylin/job/cube/GarbageCollectionStep.java | 25 ++++++---- .../apache/kylin/job/hadoop/cube/CubeHFileJob.java | 21 ++++++-- .../kylin/job/hadoop/cube/StorageCleanupJob.java | 17 +++++-- .../kylin/job/hadoop/hbase/CreateHTableJob.java | 13 +++-- .../job/hadoop/invertedindex/IICreateHFileJob.java | 26 ++++++++-- .../hadoop/invertedindex/IICreateHTableJob.java | 15 ++++-- .../org/apache/kylin/job/tools/CleanHtableCLI.java | 11 +++- .../apache/kylin/job/tools/CubeMigrationCLI.java | 27 ++++++---- .../kylin/job/tools/DeployCoprocessorCLI.java | 26 ++++++---- .../kylin/job/tools/GridTableHBaseBenchmark.java | 39 +++++++-------- .../kylin/job/tools/HtableAlterMetadataCLI.java | 11 +++- .../org/apache/kylin/job/tools/RowCounterCLI.java | 11 ++-- .../java/org/apache/kylin/job/ExportHBaseData.java | 14 +++--- .../kylin/job/hadoop/hbase/TestHbaseClient.java | 18 +++++-- .../apache/kylin/job/tools/HBaseRowDigestTest.java | 13 ++--- monitor/pom.xml | 6 +++ .../apache/kylin/monitor/MonitorMetaManager.java | 58 +++++++++++++++------- pom.xml | 2 +- .../org/apache/kylin/rest/service/AclService.java | 38 +++++++------- .../org/apache/kylin/rest/service/CubeService.java | 37 ++++++-------- .../apache/kylin/rest/service/QueryService.java | 21 ++++---- .../org/apache/kylin/rest/service/UserService.java | 27 +++++----- .../storage/hbase/CubeSegmentTupleIterator.java | 11 ++-- .../kylin/storage/hbase/CubeStorageEngine.java | 4 +- .../kylin/storage/hbase/HBaseClientKVIterator.java | 11 ++-- .../storage/hbase/InvertedIndexStorageEngine.java | 4 +- .../apache/kylin/storage/hbase/PingHBaseCLI.java | 15 +++--- .../hbase/SerializedHBaseTupleIterator.java | 4 +- .../endpoint/EndpointTupleIterator.java | 13 ++--- .../coprocessor/observer/ObserverEnabler.java | 4 +- .../storage/hbase/InvertedIndexHBaseTest.java | 10 ++-- 35 files changed, 388 insertions(+), 261 deletions(-) diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java index c4d0314..ae128d9 100644 --- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java +++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java @@ -27,9 +27,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.kylin.common.util.HadoopUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,13 +43,13 @@ public class HBaseConnection { private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class); private static final Map ConfigCache = new ConcurrentHashMap(); - private static final Map ConnPool = new ConcurrentHashMap(); + private static final Map ConnPool = new ConcurrentHashMap(); static { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - for (HConnection conn : ConnPool.values()) { + for (Connection conn : ConnPool.values()) { try { conn.close(); } catch (IOException e) { @@ -60,7 +60,7 @@ public class HBaseConnection { }); } - public static HConnection get(String url) { + public static Connection get(String url) { // find configuration Configuration conf = ConfigCache.get(url); if (conf == null) { @@ -68,11 +68,11 @@ public class HBaseConnection { ConfigCache.put(url, conf); } - HConnection connection = ConnPool.get(url); + Connection connection = ConnPool.get(url); try { // I don't use DCL since recreate a connection is not a big issue. if (connection == null) { - connection = HConnectionManager.createConnection(conf); + connection = ConnectionFactory.createConnection(conf); ConnPool.put(url, connection); } } catch (Throwable t) { @@ -86,9 +86,9 @@ public class HBaseConnection { createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families); } - public static void createHTableIfNeeded(HConnection conn, String tableName, String... families) throws IOException { - HBaseAdmin hbase = new HBaseAdmin(conn); - + public static void createHTableIfNeeded(Connection conn, String tableName, String... families) throws IOException { + Admin hbase = conn.getAdmin(); + try { boolean tableExist = false; try { diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java index 1c4a7ba..0433125 100644 --- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java +++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java @@ -33,13 +33,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; @@ -76,7 +77,7 @@ public class HBaseResourceStore extends ResourceStore { // final Map tableNameMap; // path prefix ==> HBase table name - private HConnection getConnection() throws IOException { + private Connection getConnection() throws IOException { return HBaseConnection.get(hbaseUrl); } @@ -119,7 +120,7 @@ public class HBaseResourceStore extends ResourceStore { ArrayList result = new ArrayList(); - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); Scan scan = new Scan(startRow, endRow); scan.setFilter(new KeyOnlyFilter()); try { @@ -154,7 +155,7 @@ public class HBaseResourceStore extends ResourceStore { scan.addColumn(B_FAMILY, B_COLUMN_TS); scan.addColumn(B_FAMILY, B_COLUMN); - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); List result = Lists.newArrayList(); try { ResultScanner scanner = table.getScanner(scan); @@ -214,13 +215,12 @@ public class HBaseResourceStore extends ResourceStore { IOUtils.copy(content, bout); bout.close(); - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { byte[] row = Bytes.toBytes(resPath); Put put = buildPut(resPath, ts, row, bout.toByteArray(), table); table.put(put); - table.flushCommits(); } finally { IOUtils.closeQuietly(table); } @@ -228,7 +228,7 @@ public class HBaseResourceStore extends ResourceStore { @Override protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { byte[] row = Bytes.toBytes(resPath); byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS); @@ -240,8 +240,6 @@ public class HBaseResourceStore extends ResourceStore { throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + real + ", but it is " + oldTS); } - table.flushCommits(); - return newTS; } finally { IOUtils.closeQuietly(table); @@ -250,11 +248,10 @@ public class HBaseResourceStore extends ResourceStore { @Override protected void deleteResourceImpl(String resPath) throws IOException { - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { Delete del = new Delete(Bytes.toBytes(resPath)); table.delete(del); - table.flushCommits(); } finally { IOUtils.closeQuietly(table); } @@ -276,7 +273,7 @@ public class HBaseResourceStore extends ResourceStore { scan.addColumn(family, column); } - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { ResultScanner scanner = table.getScanner(scan); Result result = null; @@ -295,7 +292,7 @@ public class HBaseResourceStore extends ResourceStore { return endRow; } - private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException { + private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException { Path redirectPath = bigCellHDFSPath(resPath); Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); @@ -321,7 +318,7 @@ public class HBaseResourceStore extends ResourceStore { return redirectPath; } - private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException { + private Put buildPut(String resPath, long ts, byte[] row, byte[] content, Table table) throws IOException { int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize(); if (content.length > kvSizeLimit) { writeLargeCellToHdfs(resPath, content, table); @@ -329,8 +326,8 @@ public class HBaseResourceStore extends ResourceStore { } Put put = new Put(row); - put.add(B_FAMILY, B_COLUMN, content); - put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts)); + put.addColumn(B_FAMILY, B_COLUMN, content); + put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts)); return put; } diff --git a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java index 093ac9e..b9ee196 100644 --- a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java +++ b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; @@ -30,12 +31,15 @@ import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,12 +57,12 @@ public class HBaseRegionSizeCalculator { /** * Computes size of each region for table and given column families. * */ - public HBaseRegionSizeCalculator(HTable table) throws IOException { - this(table, new HBaseAdmin(table.getConfiguration())); + public HBaseRegionSizeCalculator(String tableName , Connection hbaseConnection) throws IOException { + this(hbaseConnection.getTable(TableName.valueOf(tableName)), hbaseConnection.getAdmin(), hbaseConnection); } /** Constructor for unit testing */ - HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws IOException { + HBaseRegionSizeCalculator(Table table, Admin hBaseAdmin, Connection hbaseConnection) throws IOException { try { if (!enabled(table.getConfiguration())) { @@ -66,14 +70,15 @@ public class HBaseRegionSizeCalculator { return; } - logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\"."); + logger.info("Calculating region sizes for table \"" + new String(table.getName().toString()) + "\"."); // Get regions for table. - Set tableRegionInfos = table.getRegionLocations().keySet(); + RegionLocator regionLocator = hbaseConnection.getRegionLocator(table.getName()); + List regionLocationList = regionLocator.getAllRegionLocations(); Set tableRegions = new TreeSet(Bytes.BYTES_COMPARATOR); - for (HRegionInfo regionInfo : tableRegionInfos) { - tableRegions.add(regionInfo.getRegionName()); + for (HRegionLocation hRegionLocation : regionLocationList) { + tableRegions.add(hRegionLocation.getRegionInfo().getRegionName()); } ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus(); @@ -124,4 +129,4 @@ public class HBaseRegionSizeCalculator { public Map getRegionSizeMap() { return Collections.unmodifiableMap(sizeMap); } -} +} \ No newline at end of file diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java index 6d2762c..5fb9f79 100644 --- a/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java +++ b/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java @@ -21,12 +21,17 @@ package org.apache.kylin.common.util; import java.io.File; import java.io.IOException; +import org.apache.kylin.common.KylinConfig; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.kylin.common.persistence.HBaseConnection; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; @@ -57,7 +62,10 @@ public class BasicHadoopTest { tableDesc.addFamily(cf); Configuration conf = HBaseConfiguration.create(); - HBaseAdmin admin = new HBaseAdmin(conf); + //TODO: to check + String hbaseZookeeperQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM).startsWith("hbase") ? conf.get(HConstants.ZOOKEEPER_QUORUM) : KylinConfig.getInstanceFromEnv().getStorageUrl(); + Connection connection = HBaseConnection.get(hbaseZookeeperQuorum); + Admin admin = connection.getAdmin(); admin.createTable(tableDesc); admin.close(); } @@ -65,7 +73,10 @@ public class BasicHadoopTest { @Test public void testRetriveHtableHost() throws IOException { Configuration conf = HBaseConfiguration.create(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + //TODO: to check + String hbaseZookeeperQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM).startsWith("hbase") ? conf.get(HConstants.ZOOKEEPER_QUORUM) : KylinConfig.getInstanceFromEnv().getStorageUrl(); + Connection connection = HBaseConnection.get(hbaseZookeeperQuorum); + Admin hbaseAdmin = connection.getAdmin(); HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(); for (HTableDescriptor table : tableDescriptors) { String value = table.getValue("KYLIN_HOST"); diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java index f2f1fc0..1aea2af 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java +++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java @@ -28,10 +28,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.job.cmd.ShellCmdOutput; import org.apache.kylin.job.exception.ExecuteException; @@ -100,18 +103,22 @@ public class GarbageCollectionStep extends AbstractExecutable { if (oldTables != null && oldTables.size() > 0) { String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); Configuration conf = HBaseConfiguration.create(); - HBaseAdmin admin = null; + //TODO: to check + String hbaseZookeeperQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM).startsWith("hbase") ? conf.get(HConstants.ZOOKEEPER_QUORUM) : KylinConfig.getInstanceFromEnv().getStorageUrl(); + Connection connection = null; + Admin admin = null; try { - admin = new HBaseAdmin(conf); + connection = HBaseConnection.get(hbaseZookeeperQuorum); + admin = connection.getAdmin(); for (String table : oldTables) { - if (admin.tableExists(table)) { - HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table)); + if (admin.tableExists(TableName.valueOf(table))) { + HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(table)); String host = tableDescriptor.getValue(IRealizationConstants.HTableTag); if (metadataUrlPrefix.equalsIgnoreCase(host)) { - if (admin.isTableEnabled(table)) { - admin.disableTable(table); + if (admin.isTableEnabled(TableName.valueOf(table))) { + admin.disableTable(TableName.valueOf(table)); } - admin.deleteTable(table); + admin.deleteTable(TableName.valueOf(table)); logger.debug("Dropped HBase table " + table); output.append("Dropped HBase table " + table + " \n"); } else { diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java index 3c1e4a5..422e416 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java @@ -19,11 +19,16 @@ package org.apache.kylin.job.hadoop.cube; import org.apache.commons.cli.Options; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.mapreduce.Job; @@ -31,6 +36,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.job.constant.BatchConstants; @@ -47,6 +53,8 @@ public class CubeHFileJob extends AbstractHadoopJob { public int run(String[] args) throws Exception { Options options = new Options(); + Connection connection = null; + Table table = null; try { options.addOption(OPTION_JOB_NAME); @@ -80,10 +88,14 @@ public class CubeHFileJob extends AbstractHadoopJob { attachKylinPropsAndMetadata(cube, job.getConfiguration()); String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase(); - HTable htable = new HTable(conf, tableName); + //TODO: to check + String hbaseZookeeperQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM).startsWith("hbase") ? conf.get(HConstants.ZOOKEEPER_QUORUM) : KylinConfig.getInstanceFromEnv().getStorageUrl(); + connection = HBaseConnection.get(hbaseZookeeperQuorum); + table = connection.getTable(TableName.valueOf(tableName)); + RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName)); //Automatic config ! - HFileOutputFormat.configureIncrementalLoad(job, htable); + HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator); // set block replication to 3 for hfiles conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); @@ -96,6 +108,7 @@ public class CubeHFileJob extends AbstractHadoopJob { printUsage(options); throw e; } finally { + IOUtils.closeQuietly(table); if (job != null) cleanupTempConfFile(job.getConfiguration()); } diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java index 3b25ee1..edb3dfd 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java @@ -26,12 +26,16 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -107,7 +111,10 @@ public class StorageCleanupJob extends AbstractHadoopJob { IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); // get all kylin hbase tables - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + //TODO: to check + String hbaseZookeeperQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM).startsWith("hbase") ? conf.get(HConstants.ZOOKEEPER_QUORUM) : KylinConfig.getInstanceFromEnv().getStorageUrl(); + Connection connection = HBaseConnection.get(hbaseZookeeperQuorum); + Admin hbaseAdmin = connection.getAdmin(); String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); List allTablesNeedToBeDropped = new ArrayList(); @@ -141,9 +148,9 @@ public class StorageCleanupJob extends AbstractHadoopJob { // drop tables for (String htableName : allTablesNeedToBeDropped) { log.info("Deleting HBase table " + htableName); - if (hbaseAdmin.tableExists(htableName)) { - hbaseAdmin.disableTable(htableName); - hbaseAdmin.deleteTable(htableName); + if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) { + hbaseAdmin.disableTable(TableName.valueOf(htableName)); + hbaseAdmin.deleteTable(TableName.valueOf(htableName)); log.info("Deleted HBase table " + htableName); } else { log.info("HBase table" + htableName + " does not exist"); diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java index 027c0ca..abf78f7 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java @@ -25,11 +25,12 @@ import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; @@ -42,6 +43,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -81,7 +83,10 @@ public class CreateHTableJob extends AbstractHadoopJob { tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix()); Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); - HBaseAdmin admin = new HBaseAdmin(conf); + //TODO: to check + String hbaseZookeeperQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM).startsWith("hbase") ? conf.get(HConstants.ZOOKEEPER_QUORUM) : config.getStorageUrl(); + Connection connection = HBaseConnection.get(hbaseZookeeperQuorum); + Admin admin = connection.getAdmin(); try { if (User.isHBaseSecurityEnabled(conf)) { @@ -139,7 +144,7 @@ public class CreateHTableJob extends AbstractHadoopJob { byte[][] splitKeys = getSplits(conf, partitionFilePath); - if (admin.tableExists(tableName)) { + if (admin.tableExists(TableName.valueOf(tableName))) { // admin.disableTable(tableName); // admin.deleteTable(tableName); throw new RuntimeException("HBase table " + tableName + " exists!"); diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java index ef35f08..966871e 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java @@ -19,17 +19,24 @@ package org.apache.kylin.job.hadoop.invertedindex; import org.apache.commons.cli.Options; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.slf4j.Logger; @@ -45,7 +52,9 @@ public class IICreateHFileJob extends AbstractHadoopJob { public int run(String[] args) throws Exception { Options options = new Options(); - + Connection connection = null; + Table table = null; + try { options.addOption(OPTION_JOB_NAME); options.addOption(OPTION_II_NAME); @@ -69,8 +78,13 @@ public class IICreateHFileJob extends AbstractHadoopJob { job.setMapOutputValueClass(KeyValue.class); String tableName = getOptionValue(OPTION_HTABLE_NAME); - HTable htable = new HTable(HBaseConfiguration.create(getConf()), tableName); - HFileOutputFormat.configureIncrementalLoad(job, htable); + + Configuration conf = HBaseConfiguration.create(getConf()); + String hbaseZookeeperQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM).startsWith("hbase") ? conf.get(HConstants.ZOOKEEPER_QUORUM) : KylinConfig.getInstanceFromEnv().getStorageUrl(); + connection = HBaseConnection.get(hbaseZookeeperQuorum); + table = connection.getTable(TableName.valueOf(tableName)); + RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName)); + HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator); this.deletePath(job.getConfiguration(), output); @@ -78,6 +92,8 @@ public class IICreateHFileJob extends AbstractHadoopJob { } catch (Exception e) { printUsage(options); throw e; + } finally { + IOUtils.closeQuietly(table); } } diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java index a23b876..ff791eb 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java @@ -22,13 +22,16 @@ import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.invertedindex.IIInstance; @@ -78,10 +81,12 @@ public class IICreateHTableJob extends AbstractHadoopJob { DeployCoprocessorCLI.deployCoprocessor(tableDesc); // drop the table first - HBaseAdmin admin = new HBaseAdmin(conf); - if (admin.tableExists(tableName)) { - admin.disableTable(tableName); - admin.deleteTable(tableName); + String hbaseZookeeperQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM).startsWith("hbase") ? conf.get(HConstants.ZOOKEEPER_QUORUM) : config.getInstanceFromEnv().getStorageUrl(); + Connection connection = HBaseConnection.get(hbaseZookeeperQuorum); + Admin admin = connection.getAdmin(); + if (admin.tableExists(TableName.valueOf(tableName))) { + admin.disableTable(TableName.valueOf(tableName)); + admin.deleteTable(TableName.valueOf(tableName)); } // create table diff --git a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java index b6e5af5..b53370f 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java +++ b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java @@ -23,9 +23,13 @@ import java.io.IOException; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,7 +59,10 @@ public class CleanHtableCLI extends AbstractHadoopJob { private void clean() throws IOException { Configuration conf = HBaseConfiguration.create(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + //TODO: to check + String hbaseZookeeperQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM).startsWith("hbase") ? conf.get(HConstants.ZOOKEEPER_QUORUM) : KylinConfig.getInstanceFromEnv().getStorageUrl(); + Connection connection = HBaseConnection.get(hbaseZookeeperQuorum); + Admin hbaseAdmin = connection.getAdmin(); for (HTableDescriptor descriptor : hbaseAdmin.listTables()) { String name = descriptor.getNameAsString().toLowerCase(); diff --git a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java index 09e4fee..28a92f9 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java +++ b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java @@ -24,14 +24,18 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; @@ -73,7 +77,7 @@ public class CubeMigrationCLI { private static ResourceStore srcStore; private static ResourceStore dstStore; private static FileSystem hdfsFS; - private static HBaseAdmin hbaseAdmin; + private static Admin hbaseAdmin; public static void main(String[] args) throws IOException, InterruptedException { @@ -114,7 +118,10 @@ public class CubeMigrationCLI { checkAndGetHbaseUrl(); Configuration conf = HBaseConfiguration.create(); - hbaseAdmin = new HBaseAdmin(conf); + //TODO: to check + String hbaseZookeeperQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM).startsWith("hbase") ? conf.get(HConstants.ZOOKEEPER_QUORUM) : KylinConfig.getInstanceFromEnv().getStorageUrl(); + Connection connection = HBaseConnection.get(hbaseZookeeperQuorum); + hbaseAdmin = connection.getAdmin(); hdfsFS = FileSystem.get(new Configuration()); @@ -130,6 +137,8 @@ public class CubeMigrationCLI { } else { showOpts(); } + + IOUtils.closeQuietly(hbaseAdmin); } public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { @@ -284,10 +293,10 @@ public class CubeMigrationCLI { case CHANGE_HTABLE_HOST: { String tableName = (String) opt.params[0]; HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); logger.info("CHANGE_HTABLE_HOST is completed"); break; } @@ -401,10 +410,10 @@ public class CubeMigrationCLI { case CHANGE_HTABLE_HOST: { String tableName = (String) opt.params[0]; HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); break; } case COPY_FILE_IN_META: { diff --git a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java index 89472b2..d993c43 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java +++ b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java @@ -39,9 +39,11 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; @@ -68,8 +70,10 @@ public class DeployCoprocessorCLI { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); Configuration hconf = HadoopUtil.newHBaseConfiguration(kylinConfig.getStorageUrl()); FileSystem fileSystem = FileSystem.get(hconf); - HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf); - + String hbaseZookeeperQuorum = kylinConfig.getStorageUrl(); + Connection connection = HBaseConnection.get(hbaseZookeeperQuorum); + Admin hbaseAdmin = connection.getAdmin(); + String localCoprocessorJar = new File(args[0]).getAbsolutePath(); logger.info("Identify coprocessor jar " + localCoprocessorJar); @@ -96,10 +100,10 @@ public class DeployCoprocessorCLI { public static void deployCoprocessor(HTableDescriptor tableDesc) { try { initHTableCoprocessor(tableDesc); - logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor."); + logger.info("hbase table " + tableDesc.getTableName() + " deployed with coprocessor."); } catch (Exception ex) { - logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex); + logger.error("Error deploying coprocessor on " + tableDesc.getTableName(), ex); logger.error("Will try creating the table without coprocessor."); } } @@ -121,9 +125,9 @@ public class DeployCoprocessorCLI { desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null); } - public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException { + public static void resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException { logger.info("Disable " + tableName); - hbaseAdmin.disableTable(tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); logger.info("Unset coprocessor on " + tableName); HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); @@ -135,13 +139,13 @@ public class DeployCoprocessorCLI { } addCoprocessorOnHTable(desc, hdfsCoprocessorJar); - hbaseAdmin.modifyTable(tableName, desc); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); logger.info("Enable " + tableName); - hbaseAdmin.enableTable(tableName); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); } - private static List resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List tableNames) throws IOException { + private static List resetCoprocessorOnHTables(Admin hbaseAdmin, Path hdfsCoprocessorJar, List tableNames) throws IOException { List processed = new ArrayList(); for (String tableName : tableNames) { @@ -248,7 +252,7 @@ public class DeployCoprocessorCLI { return coprocessorDir; } - private static Set getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List tableNames) throws IOException { + private static Set getCoprocessorJarPaths(Admin hbaseAdmin, List tableNames) throws IOException { HashSet result = new HashSet(); for (String tableName : tableNames) { diff --git a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java index 70e1df6..41d6aa7 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java +++ b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java @@ -28,13 +28,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.Bytes; @@ -74,8 +74,7 @@ public class GridTableHBaseBenchmark { public static void testGridTable(double hitRatio, double indexRatio) throws IOException { System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio); String hbaseUrl = "hbase"; // use hbase-site.xml on classpath - - HConnection conn = HBaseConnection.get(hbaseUrl); + Connection conn = HBaseConnection.get(hbaseUrl); createHTableIfNeeded(conn, TEST_TABLE); prepareData(conn); @@ -91,10 +90,10 @@ public class GridTableHBaseBenchmark { } - private static void testColumnScan(HConnection conn, List> colScans) throws IOException { + private static void testColumnScan(Connection conn, List> colScans) throws IOException { Stats stats = new Stats("COLUMN_SCAN"); - HTableInterface table = conn.getTable(TEST_TABLE); + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { stats.markStart(); @@ -122,20 +121,20 @@ public class GridTableHBaseBenchmark { } } - private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] hits) throws IOException { + private static void testRowScanNoIndexFullScan(Connection conn, boolean[] hits) throws IOException { fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL")); } - private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] hits) throws IOException { + private static void testRowScanNoIndexSkipScan(Connection conn, boolean[] hits) throws IOException { jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP")); } - private static void testRowScanWithIndex(HConnection conn, boolean[] hits) throws IOException { + private static void testRowScanWithIndex(Connection conn, boolean[] hits) throws IOException { jumpScan(conn, hits, new Stats("ROW_SCAN_IDX")); } - private static void fullScan(HConnection conn, boolean[] hits, Stats stats) throws IOException { - HTableInterface table = conn.getTable(TEST_TABLE); + private static void fullScan(Connection conn, boolean[] hits, Stats stats) throws IOException { + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { stats.markStart(); @@ -156,11 +155,11 @@ public class GridTableHBaseBenchmark { } } - private static void jumpScan(HConnection conn, boolean[] hits, Stats stats) throws IOException { + private static void jumpScan(Connection conn, boolean[] hits, Stats stats) throws IOException { final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience - HTableInterface table = conn.getTable(TEST_TABLE); + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { stats.markStart(); @@ -204,8 +203,8 @@ public class GridTableHBaseBenchmark { } } - private static void prepareData(HConnection conn) throws IOException { - HTableInterface table = conn.getTable(TEST_TABLE); + private static void prepareData(Connection conn) throws IOException { + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { // check how many rows existing @@ -232,7 +231,7 @@ public class GridTableHBaseBenchmark { byte[] rowkey = Bytes.toBytes(i); Put put = new Put(rowkey); byte[] cell = randomBytes(); - put.add(CF, QN, cell); + put.addColumn(CF, QN, cell); table.put(put); nBytes += cell.length; dot(i, N_ROWS); @@ -258,9 +257,9 @@ public class GridTableHBaseBenchmark { return bytes; } - private static void createHTableIfNeeded(HConnection conn, String tableName) throws IOException { - HBaseAdmin hbase = new HBaseAdmin(conn); - + private static void createHTableIfNeeded(Connection conn, String tableName) throws IOException { + Admin hbase = conn.getAdmin(); + try { boolean tableExist = false; try { diff --git a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java index 53930e3..1d024f1 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java +++ b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java @@ -25,10 +25,14 @@ import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +76,10 @@ public class HtableAlterMetadataCLI extends AbstractHadoopJob { private void alter() throws IOException { Configuration conf = HBaseConfiguration.create(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + //TODO: to check + String hbaseZookeeperQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM).startsWith("hbase") ? conf.get(HConstants.ZOOKEEPER_QUORUM) : KylinConfig.getInstanceFromEnv().getStorageUrl(); + Connection connection = HBaseConnection.get(hbaseZookeeperQuorum); + Admin hbaseAdmin = connection.getAdmin(); HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); hbaseAdmin.disableTable(table.getTableName()); diff --git a/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java index 3329d27..4d44088 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java +++ b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java @@ -22,11 +22,12 @@ import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.slf4j.Logger; @@ -69,8 +70,8 @@ public class RowCounterCLI { logger.info("My Scan " + scan.toString()); - HConnection conn = HConnectionManager.createConnection(conf); - HTableInterface tableInterface = conn.getTable(htableName); + Connection conn = ConnectionFactory.createConnection(conf); + Table tableInterface = conn.getTable(TableName.valueOf(htableName)); Iterator iterator = tableInterface.getScanner(scan).iterator(); int counter = 0; diff --git a/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java b/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java index e784a41..33bd4d9 100644 --- a/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java +++ b/job/src/test/java/org/apache/kylin/job/ExportHBaseData.java @@ -22,10 +22,11 @@ import java.io.File; import java.io.IOException; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.AbstractKylinTestCase; @@ -39,7 +40,7 @@ public class ExportHBaseData { KylinConfig kylinConfig; HTableDescriptor[] allTables; Configuration config; - HBaseAdmin hbase; + Admin hbase; CliCommandExecutor cli; String exportHdfsFolder; String exportLocalFolderParent; @@ -75,10 +76,9 @@ public class ExportHBaseData { int cut = metadataUrl.indexOf('@'); tableNameBase = metadataUrl.substring(0, cut); String hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1); - - HConnection conn = HBaseConnection.get(hbaseUrl); + Connection conn = HBaseConnection.get(hbaseUrl); try { - hbase = new HBaseAdmin(conn); + hbase = conn.getAdmin(); config = hbase.getConfiguration(); allTables = hbase.listTables(); } catch (IOException e) { @@ -89,6 +89,8 @@ public class ExportHBaseData { public void tearDown() { + // close hbase admin + IOUtils.closeQuietly(hbase); // cleanup hdfs try { if (cli != null && exportHdfsFolder != null) { diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java b/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java index f2b9ed6..c2f80d3 100644 --- a/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java +++ b/job/src/test/java/org/apache/kylin/job/hadoop/hbase/TestHbaseClient.java @@ -20,10 +20,16 @@ package org.apache.kylin.job.hadoop.hbase; import java.io.IOException; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.Bytes; /** @@ -90,13 +96,15 @@ public class TestHbaseClient { conf.set("hbase.zookeeper.quorum", "hbase_host"); conf.set("zookeeper.znode.parent", "/hbase-unsecure"); - HTable table = new HTable(conf, "test1"); + Connection connection = ConnectionFactory.createConnection(conf); + Table table = connection.getTable(TableName.valueOf("test1")); Put put = new Put(Bytes.toBytes("row1")); - - put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1")); - put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2")); + + put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1")); + put.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2")); table.put(put); table.close(); + connection.close(); } } diff --git a/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java b/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java index 9f9c23c..ea5e477 100644 --- a/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java +++ b/job/src/test/java/org/apache/kylin/job/tools/HBaseRowDigestTest.java @@ -23,10 +23,11 @@ import java.io.IOException; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.BytesUtil; @@ -60,11 +61,11 @@ public class HBaseRowDigestTest extends HBaseMetadataTestCase { @Test public static void test() throws IOException { String hbaseUrl = "hbase"; // use hbase-site.xml on classpath - HConnection conn = null; - HTableInterface table = null; + Connection conn = null; + Table table = null; try { - conn = HBaseConnection.get(hbaseUrl); - table = conn.getTable("KYLIN_II_YTYWP3CQGJ"); + conn = HBaseConnection.get(hbaseUrl); + table = conn.getTable(TableName.valueOf("KYLIN_II_YTYWP3CQGJ")); ResultScanner scanner = table.getScanner(CF, QN); StringBuffer sb = new StringBuffer(); while (true) { diff --git a/monitor/pom.xml b/monitor/pom.xml index 566c873..214d903 100644 --- a/monitor/pom.xml +++ b/monitor/pom.xml @@ -38,6 +38,12 @@ + + org.apache.kylin + kylin-common + ${project.parent.version} + + junit junit diff --git a/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java b/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java index 97200fc..bafb8b4 100644 --- a/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java +++ b/monitor/src/main/java/org/apache/kylin/monitor/MonitorMetaManager.java @@ -20,17 +20,22 @@ package org.apache.kylin.monitor; import java.io.IOException; +import org.apache.kylin.common.persistence.HBaseConnection; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; @@ -42,6 +47,8 @@ public class MonitorMetaManager { private static ConfigUtils monitorConfig = ConfigUtils.getInstance(); static String TABLE_NAME = "kylin_metadata"; + static String hbaseZookeeperQuorum = null; + static Connection connection = null; final static String COLUMN_FAMILY = "f"; final static String COLUMN = "c"; final static String ROW_KEY_QUERY_READ_FILES = "/performance/query_log_files_already_read"; @@ -53,7 +60,7 @@ public class MonitorMetaManager { final static Logger logger = Logger.getLogger(MonitorMetaManager.class); static Configuration conf = null; - + static { try { monitorConfig.loadMonitorParam(); @@ -69,6 +76,8 @@ public class MonitorMetaManager { */ public static void init() throws Exception { MonitorMetaManager.TABLE_NAME = monitorConfig.getMetadataUrlPrefix(); + MonitorMetaManager.hbaseZookeeperQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM); + MonitorMetaManager.connection = HBaseConnection.get(hbaseZookeeperQuorum); logger.info("Monitor Metadata Table :" + MonitorMetaManager.TABLE_NAME); logger.info("init monitor metadata,create table if not exist"); MonitorMetaManager.creatTable(TABLE_NAME, new String[] { COLUMN_FAMILY }); @@ -163,17 +172,29 @@ public class MonitorMetaManager { /* * create table in hbase */ - public static void creatTable(String tableName, String[] family) throws Exception { - HBaseAdmin admin = new HBaseAdmin(conf); - HTableDescriptor desc = new HTableDescriptor(tableName); - for (int i = 0; i < family.length; i++) { - desc.addFamily(new HColumnDescriptor(family[i])); - } - if (admin.tableExists(tableName)) { - logger.info("table Exists!"); - } else { - admin.createTable(desc); - logger.info("create table Success!"); + public static void creatTable(String tableName, String[] family) throws IOException { + Admin admin = null; + HTableDescriptor desc; + + try { + admin = connection.getAdmin(); + desc = new HTableDescriptor(TableName.valueOf(tableName)); + + for (int i = 0; i < family.length; i++) { + desc.addFamily(new HColumnDescriptor(family[i])); + } + + if (admin.tableExists(TableName.valueOf(tableName))) { + logger.info("table Exists!"); + + } else { + admin.createTable(desc); + logger.info("create table Success!"); + } + } catch (IOException ioException) { + throw ioException; + } finally { + IOUtils.closeQuietly(admin); } } @@ -181,13 +202,15 @@ public class MonitorMetaManager { * update cell in hbase */ public static void updateData(String tableName, String rowKey, String family, String column, String value) throws IOException { - HTable table = new HTable(conf, Bytes.toBytes(tableName)); + Table table = connection.getTable(TableName.valueOf(tableName)); Put put = new Put(rowKey.getBytes()); - put.add(family.getBytes(), column.getBytes(), value.getBytes()); + put.addColumn(family.getBytes(), column.getBytes(), value.getBytes()); try { table.put(put); } catch (IOException e) { e.printStackTrace(); + } finally { + IOUtils.closeQuietly(table); } logger.info("update table [" + tableName + "]"); logger.info("rowKey [" + rowKey + "]"); @@ -200,9 +223,10 @@ public class MonitorMetaManager { * get result by rowkey */ public static Result getResultByRowKey(String tableName, String rowKey) throws IOException { - HTable table = new HTable(conf, Bytes.toBytes(tableName)); + Table table = connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowKey)); Result result = table.get(get); + IOUtils.closeQuietly(table); return result; } diff --git a/pom.xml b/pom.xml index 23478d5..68ad048 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ 2.7.0 3.4.6 1.1.0 - 1.1.0 + 1.2.1 1.1.0 2.7.1 diff --git a/server/src/main/java/org/apache/kylin/rest/service/AclService.java b/server/src/main/java/org/apache/kylin/rest/service/AclService.java index ea2a48e..8a1cf6d 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/AclService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/AclService.java @@ -29,13 +29,14 @@ import java.util.Map; import java.util.NavigableMap; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.kylin.common.KylinConfig; @@ -130,9 +131,9 @@ public class AclService implements MutableAclService { @Override public List findChildren(ObjectIdentity parentIdentity) { List oids = new ArrayList(); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName)); Scan scan = new Scan(); SingleColumnValueFilter parentFilter = new SingleColumnValueFilter(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), CompareOp.EQUAL, domainObjSerializer.serialize(new DomainObjectInfo(parentIdentity))); @@ -179,10 +180,10 @@ public class AclService implements MutableAclService { @Override public Map readAclsById(List oids, List sids) throws NotFoundException { Map aclMaps = new HashMap(); - HTableInterface htable = null; + Table htable = null; Result result = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName)); for (ObjectIdentity oid : oids) { result = htable.get(new Get(Bytes.toBytes(String.valueOf(oid.getIdentifier())))); @@ -231,16 +232,15 @@ public class AclService implements MutableAclService { Authentication auth = SecurityContextHolder.getContext().getAuthentication(); PrincipalSid sid = new PrincipalSid(auth); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName)); Put put = new Put(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier()))); - put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType())); - put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid))); - put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true)); + put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType())); + put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid))); + put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true)); htable.put(put); - htable.flushCommits(); logger.debug("ACL of " + objectIdentity + " created successfully."); } catch (IOException e) { @@ -254,9 +254,9 @@ public class AclService implements MutableAclService { @Override public void deleteAcl(ObjectIdentity objectIdentity, boolean deleteChildren) throws ChildrenExistException { - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName)); Delete delete = new Delete(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier()))); List children = findChildren(objectIdentity); @@ -269,7 +269,6 @@ public class AclService implements MutableAclService { } htable.delete(delete); - htable.flushCommits(); logger.debug("ACL of " + objectIdentity + " deleted successfully."); } catch (IOException e) { @@ -287,27 +286,26 @@ public class AclService implements MutableAclService { throw e; } - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(aclTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName)); Delete delete = new Delete(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier()))); - delete.deleteFamily(Bytes.toBytes(ACL_ACES_FAMILY)); + delete.addFamily(Bytes.toBytes(ACL_ACES_FAMILY)); htable.delete(delete); Put put = new Put(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier()))); if (null != acl.getParentAcl()) { - put.add(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity()))); + put.addColumn(Bytes.toBytes(ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity()))); } for (AccessControlEntry ace : acl.getEntries()) { AceInfo aceInfo = new AceInfo(ace); - put.add(Bytes.toBytes(ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo)); + put.addColumn(Bytes.toBytes(ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo)); } if (!put.isEmpty()) { htable.put(put); - htable.flushCommits(); logger.debug("ACL of " + acl.getObjectIdentity() + " updated successfully."); } diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java index fdfd136..756f0c9 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -29,9 +29,9 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.HBaseRegionSizeCalculator; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; @@ -432,34 +432,25 @@ public class CubeService extends BasicService { public HBaseResponse getHTableInfo(String tableName) throws IOException { // Get HBase storage conf. String hbaseUrl = KylinConfig.getInstanceFromEnv().getStorageUrl(); - Configuration hconf = HadoopUtil.newHBaseConfiguration(hbaseUrl); - - HTable table = null; + Connection hbaseConnection = HBaseConnection.get(hbaseUrl); + HBaseResponse hr = null; long tableSize = 0; int regionCount = 0; - try { - table = new HTable(hconf, tableName); - - HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table); - Map sizeMap = cal.getRegionSizeMap(); + HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, hbaseConnection); + Map sizeMap = cal.getRegionSizeMap(); - for (long s : sizeMap.values()) { - tableSize += s; - } + for (long s : sizeMap.values()) { + tableSize += s; + } - regionCount = sizeMap.size(); + regionCount = sizeMap.size(); - // Set response. - hr = new HBaseResponse(); - hr.setTableSize(tableSize); - hr.setRegionCount(regionCount); - } finally { - if (null != table) { - table.close(); - } - } + // Set response. + hr = new HBaseResponse(); + hr.setTableSize(tableSize); + hr.setRegionCount(regionCount); return hr; } diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java index 8a397cd..854a5d0 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -42,10 +42,11 @@ import javax.sql.DataSource; import org.apache.calcite.avatica.ColumnMetaData.Rep; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.persistence.HBaseConnection; @@ -124,14 +125,13 @@ public class QueryService extends BasicService { Query[] queryArray = new Query[queries.size()]; byte[] bytes = querySerializer.serialize(queries.toArray(queryArray)); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Put put = new Put(Bytes.toBytes(creator)); - put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); + put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); htable.put(put); - htable.flushCommits(); } finally { IOUtils.closeQuietly(htable); } @@ -157,14 +157,13 @@ public class QueryService extends BasicService { Query[] queryArray = new Query[queries.size()]; byte[] bytes = querySerializer.serialize(queries.toArray(queryArray)); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Put put = new Put(Bytes.toBytes(creator)); - put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); + put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); htable.put(put); - htable.flushCommits(); } finally { IOUtils.closeQuietly(htable); } @@ -176,9 +175,9 @@ public class QueryService extends BasicService { } List queries = new ArrayList(); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Get get = new Get(Bytes.toBytes(creator)); get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY)); Result result = htable.get(get); diff --git a/server/src/main/java/org/apache/kylin/rest/service/UserService.java b/server/src/main/java/org/apache/kylin/rest/service/UserService.java index d665ab9..d03cd55 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/UserService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/UserService.java @@ -25,13 +25,14 @@ import java.util.Collection; import java.util.List; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.Bytes; @@ -75,9 +76,9 @@ public class UserService implements UserManager { @Override public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException { - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Get get = new Get(Bytes.toBytes(username)); get.addFamily(Bytes.toBytes(USER_AUTHORITY_FAMILY)); @@ -106,15 +107,14 @@ public class UserService implements UserManager { @Override public void updateUser(UserDetails user) { - HTableInterface htable = null; + Table htable = null; try { byte[] userAuthorities = serialize(user.getAuthorities()); - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Put put = new Put(Bytes.toBytes(user.getUsername())); - put.add(Bytes.toBytes(USER_AUTHORITY_FAMILY), Bytes.toBytes(USER_AUTHORITY_COLUMN), userAuthorities); + put.addColumn(Bytes.toBytes(USER_AUTHORITY_FAMILY), Bytes.toBytes(USER_AUTHORITY_COLUMN), userAuthorities); htable.put(put); - htable.flushCommits(); } catch (IOException e) { throw new RuntimeException(e.getMessage(), e); } finally { @@ -124,13 +124,12 @@ public class UserService implements UserManager { @Override public void deleteUser(String username) { - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Delete delete = new Delete(Bytes.toBytes(username)); htable.delete(delete); - htable.flushCommits(); } catch (IOException e) { throw new RuntimeException(e.getMessage(), e); } finally { @@ -145,9 +144,9 @@ public class UserService implements UserManager { @Override public boolean userExists(String username) { - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Result result = htable.get(new Get(Bytes.toBytes(username))); return null != result && !result.isEmpty(); @@ -164,10 +163,10 @@ public class UserService implements UserManager { s.addColumn(Bytes.toBytes(USER_AUTHORITY_FAMILY), Bytes.toBytes(USER_AUTHORITY_COLUMN)); List authorities = new ArrayList(); - HTableInterface htable = null; + Table htable = null; ResultScanner scanner = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); scanner = htable.getScanner(s); for (Result result = scanner.next(); result != null; result = scanner.next()) { diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java index 9efbb79..83a794d 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java @@ -28,11 +28,12 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; @@ -82,7 +83,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { private final Collection rowValueDecoders; private final StorageContext context; private final String tableName; - private final HTableInterface table; + private final Table table; private final RowKeyDecoder rowKeyDecoder; private final Iterator rangeIterator; @@ -94,7 +95,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { private int scanCount; private int scanCountDelta; - public CubeSegmentTupleIterator(CubeSegment cubeSeg, Collection keyRanges, HConnection conn, Collection dimensions, TupleFilter filter, Collection groupBy, Collection rowValueDecoders, StorageContext context) { + public CubeSegmentTupleIterator(CubeSegment cubeSeg, Collection keyRanges, Connection conn, Collection dimensions, TupleFilter filter, Collection groupBy, Collection rowValueDecoders, StorageContext context) { this.cube = cubeSeg.getCubeInstance(); this.cubeSeg = cubeSeg; this.dimensions = dimensions; @@ -106,7 +107,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg); try { - this.table = conn.getTable(tableName); + this.table = conn.getTable(TableName.valueOf(tableName)); } catch (Throwable t) { throw new StorageException("Error when open connection to table " + tableName, t); } diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java index 8eb7bcb..9ae296f 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java @@ -32,7 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.Pair; @@ -140,7 +140,7 @@ public class CubeStorageEngine implements IStorageEngine { setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial setLimit(filter, context); - HConnection conn = HBaseConnection.get(context.getConnUrl()); + Connection conn = HBaseConnection.get(context.getConnUrl()); return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context); } diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java index 918fd4b..d27952a 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/HBaseClientKVIterator.java @@ -24,10 +24,11 @@ import java.util.Iterator; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.util.Pair; import org.apache.kylin.invertedindex.model.IIDesc; @@ -41,15 +42,15 @@ public class HBaseClientKVIterator implements Iterable iterator; - public HBaseClientKVIterator(HConnection hconn, String tableName, byte[] family, byte[] qualifier) throws IOException { + public HBaseClientKVIterator(Connection hconn, String tableName, byte[] family, byte[] qualifier) throws IOException { this.family = family; this.qualifier = qualifier; - this.table = hconn.getTable(tableName); + this.table = hconn.getTable(TableName.valueOf(tableName)); this.scanner = table.getScanner(family, qualifier); this.iterator = scanner.iterator(); } diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java index afb49c0..4a6ec1c 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/InvertedIndexStorageEngine.java @@ -20,7 +20,7 @@ package org.apache.kylin.storage.hbase; import java.util.ArrayList; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IISegment; @@ -46,7 +46,7 @@ public class InvertedIndexStorageEngine implements IStorageEngine { String tableName = seg.getStorageLocationIdentifier(); //HConnection is cached, so need not be closed - HConnection conn = HBaseConnection.get(context.getConnUrl()); + Connection conn = HBaseConnection.get(context.getConnUrl()); try { return new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn); } catch (Throwable e) { diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java b/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java index a349fdb..5558f62 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/PingHBaseCLI.java @@ -21,12 +21,13 @@ package org.apache.kylin.storage.hbase; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.token.TokenUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -58,12 +59,12 @@ public class PingHBaseCLI { Scan scan = new Scan(); int limit = 20; - HConnection conn = null; - HTableInterface table = null; + Connection conn = null; + Table table = null; ResultScanner scanner = null; try { - conn = HConnectionManager.createConnection(hconf); - table = conn.getTable(hbaseTable); + conn = ConnectionFactory.createConnection(hconf); + table = conn.getTable(TableName.valueOf(hbaseTable)); scanner = table.getScanner(scan); int count = 0; for (Result r : scanner) { diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java index a115753..e950e5b 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java @@ -24,7 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.kv.RowValueDecoder; @@ -52,7 +52,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator { private ITupleIterator segmentIterator; private int scanCount; - public SerializedHBaseTupleIterator(HConnection conn, List segmentKeyRanges, CubeInstance cube, Collection dimensions, TupleFilter filter, Collection groupBy, Collection rowValueDecoders, StorageContext context) { + public SerializedHBaseTupleIterator(Connection conn, List segmentKeyRanges, CubeInstance cube, Collection dimensions, TupleFilter filter, Collection groupBy, Collection rowValueDecoders, StorageContext context) { this.context = context; int limit = context.getLimit(); diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java index 7a0ab15..87f9354 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java @@ -26,8 +26,9 @@ import java.util.List; import java.util.Map; import org.apache.commons.io.IOUtils; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; @@ -79,14 +80,14 @@ public class EndpointTupleIterator implements ITupleIterator { Iterator> regionResponsesIterator = null; ITupleIterator tupleIterator = null; - HTableInterface table = null; + Table table = null; int rowsInAllMetric = 0; - public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection groupBy, List measures, StorageContext context, HConnection conn) throws Throwable { + public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection groupBy, List measures, StorageContext context, Connection conn) throws Throwable { String tableName = segment.getStorageLocationIdentifier(); - table = conn.getTable(tableName); + table = conn.getTable(TableName.valueOf(tableName)); factTableName = segment.getIIDesc().getFactTableName(); if (rootFilter == null) { @@ -212,7 +213,7 @@ public class EndpointTupleIterator implements ITupleIterator { } //TODO : async callback - private Iterator> getResults(final IIProtos.IIRequest request, HTableInterface table) throws Throwable { + private Iterator> getResults(final IIProtos.IIRequest request, Table table) throws Throwable { Map> results = table.coprocessorService(IIProtos.RowsService.class, null, null, new Batch.Call>() { public List call(IIProtos.RowsService rowsService) throws IOException { ServerRpcController controller = new ServerRpcController(); diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java index f0f7ed5..fa2a7c1 100644 --- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java +++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java @@ -23,9 +23,9 @@ import java.util.Collection; import java.util.Map; import java.util.Set; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.debug.BackdoorToggles; @@ -58,7 +58,7 @@ public class ObserverEnabler { static final Map CUBE_OVERRIDES = Maps.newConcurrentMap(); public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment segment, Cuboid cuboid, TupleFilter tupleFiler, // - Collection groupBy, Collection rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) throws IOException { + Collection groupBy, Collection rowValueDecoders, StorageContext context, Table table, Scan scan) throws IOException { if (context.isCoprocessorEnabled() == false) { return table.getScanner(scan); diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java index 10a1a82..897f130 100644 --- a/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java +++ b/storage/src/test/java/org/apache/kylin/storage/hbase/InvertedIndexHBaseTest.java @@ -20,9 +20,10 @@ package org.apache.kylin.storage.hbase; import java.util.List; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.HBaseMetadataTestCase; @@ -49,7 +50,7 @@ public class InvertedIndexHBaseTest extends HBaseMetadataTestCase { IIInstance ii; IISegment seg; - HConnection hconn; + Connection hconn; TableRecordInfo info; @@ -62,13 +63,14 @@ public class InvertedIndexHBaseTest extends HBaseMetadataTestCase { String hbaseUrl = KylinConfig.getInstanceFromEnv().getStorageUrl(); Configuration hconf = HadoopUtil.newHBaseConfiguration(hbaseUrl); - hconn = HConnectionManager.createConnection(hconf); + hconn = ConnectionFactory.createConnection(hconf); this.info = new TableRecordInfo(seg); } @After public void after() throws Exception { + IOUtils.closeQuietly(hconn); this.cleanupTestMetadata(); } -- 1.9.1