From dffbbf3bb4ff4414a751d5635974362513b36513 Mon Sep 17 00:00:00 2001 From: shaofengshi Date: Wed, 23 Mar 2016 17:07:05 +0800 Subject: [PATCH] KYLIN-1528 Create a branch for v1.5 with HBase 1.1 API --- .../kylin/job/hadoop/invertedindex/IITest.java | 10 ++- .../org/apache/kylin/common/KylinConfigBase.java | 3 + examples/test_case_data/sandbox/hbase-site.xml | 19 +---- examples/test_case_data/sandbox/kylin_job_conf.xml | 86 +++++++++----------- examples/test_case_data/sandbox/mapred-site.xml | 23 +++--- .../kylin/provision/BuildCubeWithEngine.java | 9 +- .../apache/kylin/provision/BuildIIWithStream.java | 13 +-- .../storage/hbase/ii/ITInvertedIndexHBaseTest.java | 9 +- pom.xml | 18 ++-- .../kylin/rest/security/AclHBaseStorage.java | 4 +- .../kylin/rest/security/MockAclHBaseStorage.java | 8 +- .../org/apache/kylin/rest/security/MockHTable.java | 95 ++++------------------ .../kylin/rest/security/RealAclHBaseStorage.java | 9 +- .../org/apache/kylin/rest/service/AclService.java | 25 +++--- .../org/apache/kylin/rest/service/CubeService.java | 36 +++----- .../apache/kylin/rest/service/QueryService.java | 21 +++-- .../org/apache/kylin/rest/service/UserService.java | 16 ++-- .../kylin/storage/hbase/HBaseConnection.java | 36 ++++---- .../kylin/storage/hbase/HBaseResourceStore.java | 31 ++++--- .../hbase/cube/v1/CubeSegmentTupleIterator.java | 11 +-- .../storage/hbase/cube/v1/CubeStorageQuery.java | 4 +- .../hbase/cube/v1/HBaseClientKVIterator.java | 11 +-- .../hbase/cube/v1/RegionScannerAdapter.java | 11 ++- .../cube/v1/SerializedHBaseTupleIterator.java | 4 +- .../observer/AggregateRegionObserver.java | 4 +- .../coprocessor/observer/AggregationScanner.java | 15 +++- .../observer/ObserverAggregationCache.java | 15 ++-- .../v1/coprocessor/observer/ObserverEnabler.java | 4 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 11 +-- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 9 +- .../v2/coprocessor/endpoint/CubeVisitService.java | 4 +- .../kylin/storage/hbase/ii/IICreateHTableJob.java | 13 +-- .../hbase/ii/InvertedIndexStorageQuery.java | 6 +- .../endpoint/EndpointTupleIterator.java | 13 +-- .../hbase/ii/coprocessor/endpoint/IIEndpoint.java | 4 +- .../kylin/storage/hbase/steps/CubeHTableUtil.java | 11 ++- .../storage/hbase/steps/DeprecatedGCStep.java | 25 +++--- .../storage/hbase/steps/HBaseCuboidWriter.java | 7 +- .../storage/hbase/steps/HBaseStreamingOutput.java | 9 +- .../kylin/storage/hbase/steps/MergeGCStep.java | 25 +++--- .../kylin/storage/hbase/util/CleanHtableCLI.java | 15 ++-- .../kylin/storage/hbase/util/CubeMigrationCLI.java | 36 ++++---- .../storage/hbase/util/CubeMigrationCheckCLI.java | 18 ++-- .../storage/hbase/util/DeployCoprocessorCLI.java | 23 +++--- .../storage/hbase/util/ExtendCubeToHybridCLI.java | 8 +- .../hbase/util/GridTableHBaseBenchmark.java | 34 ++++---- .../kylin/storage/hbase/util/HBaseClean.java | 19 +++-- .../hbase/util/HBaseRegionSizeCalculator.java | 42 ++++------ .../kylin/storage/hbase/util/HBaseUsage.java | 10 ++- .../storage/hbase/util/HbaseStreamingInput.java | 32 ++++---- .../storage/hbase/util/HtableAlterMetadataCLI.java | 10 ++- .../storage/hbase/util/OrphanHBaseCleanJob.java | 18 ++-- .../kylin/storage/hbase/util/PingHBaseCLI.java | 15 ++-- .../kylin/storage/hbase/util/RowCounterCLI.java | 11 +-- .../storage/hbase/util/StorageCleanupJob.java | 16 ++-- .../storage/hbase/util/UpdateHTableHostCLI.java | 17 ++-- .../observer/AggregateRegionObserverTest.java | 31 +++---- .../v1/filter/TestFuzzyRowFilterV2EndToEnd.java | 5 +- 58 files changed, 502 insertions(+), 545 deletions(-) diff --git a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java index da25143..5d2cfc4 100644 --- a/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java +++ b/assembly/src/test/java/org/apache/kylin/job/hadoop/invertedindex/IITest.java @@ -38,6 +38,7 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.kylin.common.util.FIFOIterable; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.common.util.Pair; @@ -222,6 +223,11 @@ public class IITest extends LocalFileMetadataTestCase { } @Override + public int getBatch() { + return -1; + } + + @Override public boolean nextRaw(List result) throws IOException { if (iiRowIterator.hasNext()) { IIRow iiRow = iiRowIterator.next(); @@ -233,7 +239,7 @@ public class IITest extends LocalFileMetadataTestCase { } @Override - public boolean nextRaw(List result, int limit) throws IOException { + public boolean nextRaw(List result, ScannerContext scannerContext) throws IOException { throw new NotImplementedException(); } @@ -243,7 +249,7 @@ public class IITest extends LocalFileMetadataTestCase { } @Override - public boolean next(List result, int limit) throws IOException { + public boolean next(List result, ScannerContext scannerContext) throws IOException { throw new NotImplementedException(); } diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index e3a73e8..38abc9a 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -579,4 +579,7 @@ public class KylinConfigBase implements Serializable { return getOptional("kylin.hive.beeline.params", ""); } + public String getPatchedFuzzyRowFilterVersion() { + return this.getOptional("kylin.hbase.filter.fuzzy.row.filter.version", "1.1.3"); + } } diff --git a/examples/test_case_data/sandbox/hbase-site.xml b/examples/test_case_data/sandbox/hbase-site.xml index 46d5345..734908e 100644 --- a/examples/test_case_data/sandbox/hbase-site.xml +++ b/examples/test_case_data/sandbox/hbase-site.xml @@ -190,22 +190,5 @@ zookeeper.znode.parent /hbase-unsecure - - hbase.client.pause - 100 - General client pause value. Used mostly as value to wait - before running a retry of a failed get, region lookup, etc. - See hbase.client.retries.number for description of how we backoff from - this initial pause amount and how this pause works w/ retries. - - - hbase.client.retries.number - 5 - Maximum retries. Used as maximum for all retryable - operations such as the getting of a cell's value, starting a row update, - etc. Retry interval is a rough function based on hbase.client.pause. At - first we retry at this interval but then with backoff, we pretty quickly reach - retrying every ten seconds. See HConstants#RETRY_BACKOFF for how the backup - ramps up. Change this setting and hbase.client.pause to suit your workload. - + diff --git a/examples/test_case_data/sandbox/kylin_job_conf.xml b/examples/test_case_data/sandbox/kylin_job_conf.xml index bd947af..81704f2 100644 --- a/examples/test_case_data/sandbox/kylin_job_conf.xml +++ b/examples/test_case_data/sandbox/kylin_job_conf.xml @@ -1,20 +1,18 @@ + @@ -26,44 +24,41 @@ - - mapreduce.map.maxattempts - 2 - + + + mapred.output.compression.type + BLOCK + The compression type to use for job outputs + + + !--> mapreduce.job.max.split.locations @@ -76,5 +71,4 @@ 2 Block replication - \ No newline at end of file diff --git a/examples/test_case_data/sandbox/mapred-site.xml b/examples/test_case_data/sandbox/mapred-site.xml index 18f6feb..ff1c7eb 100644 --- a/examples/test_case_data/sandbox/mapred-site.xml +++ b/examples/test_case_data/sandbox/mapred-site.xml @@ -18,7 +18,7 @@ io.sort.mb - 128 + 64 @@ -28,12 +28,12 @@ mapred.job.map.memory.mb - 512 + 250 mapred.job.reduce.memory.mb - 512 + 250 @@ -58,7 +58,7 @@ mapreduce.application.classpath - /tmp/kylin/*,$HADOOP_CONF_DIR,/usr/hdp/${hdp.version}/hbase/lib/hbase-common.jar,/usr/hdp/current/hive-client/conf/,$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/usr/hdp/${hdp.version}/hadoop/lib/snappy-java-1.0.4.1.jar:/etc/hadoop/conf/secure + /tmp/kylin/*,$HADOOP_CONF_DIR,/usr/hdp/${hdp.version}/hbase/lib/hbase-common.jar,/usr/hdp/current/hive-client/conf/,$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure @@ -81,10 +81,9 @@ false - mapreduce.job.reduce.slowstart.completedmaps - 1 + 0.05 @@ -114,7 +113,7 @@ mapreduce.map.java.opts - -Xmx512m + -Xmx200m @@ -124,7 +123,7 @@ mapreduce.map.memory.mb - 512 + 250 @@ -169,7 +168,7 @@ mapreduce.reduce.memory.mb - 512 + 250 @@ -219,7 +218,7 @@ mapreduce.task.io.sort.mb - 128 + 64 @@ -234,7 +233,7 @@ yarn.app.mapreduce.am.command-opts - -Xmx512m + -Xmx200m @@ -244,7 +243,7 @@ yarn.app.mapreduce.am.resource.mb - 512 + 250 diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java index cfefef3..0446443 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java @@ -37,8 +37,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.AbstractKylinTestCase; @@ -60,6 +59,7 @@ import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.impl.threadpool.DefaultScheduler; import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.metadata.model.IEngineAware; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator; import org.apache.kylin.storage.hbase.util.StorageCleanupJob; import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; @@ -367,10 +367,9 @@ public class BuildCubeWithEngine { } private void checkHFilesInHBase(CubeSegment segment) throws IOException { - Configuration conf = HBaseConfiguration.create(HadoopUtil.getCurrentConfiguration()); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); String tableName = segment.getStorageLocationIdentifier(); - HTable table = new HTable(conf, tableName); - HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table); + HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn); Map sizeMap = cal.getRegionSizeMap(); long totalSize = 0; for (Long size : sizeMap.values()) { diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java index 9b7cd14..3616ff9 100644 --- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java +++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildIIWithStream.java @@ -45,7 +45,8 @@ import java.util.TimeZone; import java.util.UUID; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.util.ToolRunner; @@ -200,7 +201,7 @@ public class BuildIIWithStream { } } final IISegment segment = createSegment(iiName); - final HTableInterface htable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(segment.getStorageLocationIdentifier()); + final Table htable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(TableName.valueOf(segment.getStorageLocationIdentifier())); String[] args = new String[] { "-iiname", iiName, "-htablename", segment.getStorageLocationIdentifier() }; ToolRunner.run(new IICreateHTableJob(), args); @@ -238,7 +239,7 @@ public class BuildIIWithStream { } } - private void build(SliceBuilder sliceBuilder, StreamingBatch batch, HTableInterface htable) throws IOException { + private void build(SliceBuilder sliceBuilder, StreamingBatch batch, Table htable) throws IOException { final Slice slice = sliceBuilder.buildSlice(batch); try { loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo())); @@ -247,17 +248,17 @@ public class BuildIIWithStream { } } - private void loadToHBase(HTableInterface hTable, Slice slice, IIKeyValueCodec codec) throws IOException { + private void loadToHBase(Table hTable, Slice slice, IIKeyValueCodec codec) throws IOException { List data = Lists.newArrayList(); for (IIRow row : codec.encodeKeyValue(slice)) { final byte[] key = row.getKey().get(); final byte[] value = row.getValue().get(); Put put = new Put(key); - put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value); + put.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_QUALIFIER_BYTES, value); final ImmutableBytesWritable dictionary = row.getDictionary(); final byte[] dictBytes = dictionary.get(); if (dictionary.getOffset() == 0 && dictionary.getLength() == dictBytes.length) { - put.add(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes); + put.addColumn(IIDesc.HBASE_FAMILY_BYTES, IIDesc.HBASE_DICTIONARY_BYTES, dictBytes); } else { throw new RuntimeException("dict offset should be 0, and dict length should be " + dictBytes.length + " but they are" + dictionary.getOffset() + " " + dictionary.getLength()); } diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java index b5be703..d0ec67f 100644 --- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java @@ -20,9 +20,7 @@ package org.apache.kylin.storage.hbase.ii; import java.util.List; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; @@ -49,7 +47,7 @@ public class ITInvertedIndexHBaseTest extends HBaseMetadataTestCase { IIInstance ii; IISegment seg; - HConnection hconn; + Connection hconn; TableRecordInfo info; @@ -60,8 +58,7 @@ public class ITInvertedIndexHBaseTest extends HBaseMetadataTestCase { this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join"); this.seg = ii.getFirstSegment(); - Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); - hconn = HConnectionManager.createConnection(hconf); + hconn = HBaseConnection.get(getTestConfig().getStorageUrl()); this.info = new TableRecordInfo(seg); } diff --git a/pom.xml b/pom.xml index f878d8b..fd72780 100644 --- a/pom.xml +++ b/pom.xml @@ -45,12 +45,12 @@ UTF-8 - 2.6.0 - 2.6.0 + 2.7.1 + 2.7.1 3.4.6 - 0.14.0 - 0.14.0 - 0.98.4-hadoop2 + 1.2.1 + 1.2.1 + 1.1.1 0.8.1 @@ -62,6 +62,7 @@ 1.2 + 1.4 2.6 3.1 3.2.1 @@ -101,7 +102,7 @@ 1.6.0 - 2.6.0 + 2.7.1 jacoco @@ -322,6 +323,11 @@ ${commons-cli.version} + commons-codec + commons-codec + ${commons-codec.version} + + commons-lang commons-lang ${commons-lang.version} diff --git a/server/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java b/server/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java index 2220113..0e71cd5 100644 --- a/server/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java +++ b/server/src/main/java/org/apache/kylin/rest/security/AclHBaseStorage.java @@ -20,7 +20,7 @@ package org.apache.kylin.rest.security; import java.io.IOException; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; /** */ @@ -37,6 +37,6 @@ public interface AclHBaseStorage { String prepareHBaseTable(Class clazz) throws IOException; - HTableInterface getTable(String tableName) throws IOException; + Table getTable(String tableName) throws IOException; } diff --git a/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java b/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java index 193f5a8..933c49d 100644 --- a/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java +++ b/server/src/main/java/org/apache/kylin/rest/security/MockAclHBaseStorage.java @@ -21,7 +21,7 @@ package org.apache.kylin.rest.security; import java.io.IOException; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.rest.service.AclService; import org.apache.kylin.rest.service.UserService; @@ -29,8 +29,8 @@ import org.apache.kylin.rest.service.UserService; */ public class MockAclHBaseStorage implements AclHBaseStorage { - private HTableInterface mockedAclTable; - private HTableInterface mockedUserTable; + private Table mockedAclTable; + private Table mockedUserTable; private static final String aclTableName = "MOCK-ACL-TABLE"; private static final String userTableName = "MOCK-USER-TABLE"; @@ -49,7 +49,7 @@ public class MockAclHBaseStorage implements AclHBaseStorage { } @Override - public HTableInterface getTable(String tableName) throws IOException { + public Table getTable(String tableName) throws IOException { if (StringUtils.equals(tableName, aclTableName)) { return mockedAclTable; } else if (StringUtils.equals(tableName, userTableName)) { diff --git a/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java b/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java index d0aa0ed..972eea9 100644 --- a/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java +++ b/server/src/main/java/org/apache/kylin/rest/security/MockHTable.java @@ -51,7 +51,7 @@ import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; @@ -91,7 +91,7 @@ import com.google.protobuf.ServiceException; *
  • remove some methods for loading data, checking values ...
  • * */ -public class MockHTable implements HTableInterface { +public class MockHTable implements Table { private final String tableName; private final List columnFamilies = new ArrayList<>(); @@ -114,14 +114,6 @@ public class MockHTable implements HTableInterface { this.columnFamilies.add(columnFamily); } - /** - * {@inheritDoc} - */ - @Override - public byte[] getTableName() { - return tableName.getBytes(); - } - @Override public TableName getName() { return null; @@ -200,8 +192,8 @@ public class MockHTable implements HTableInterface { } @Override - public Boolean[] exists(List gets) throws IOException { - return new Boolean[0]; + public boolean[] existsAll(List list) throws IOException { + return new boolean[0]; } /** @@ -306,15 +298,6 @@ public class MockHTable implements HTableInterface { * {@inheritDoc} */ @Override - public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { - // FIXME: implement - return null; - } - - /** - * {@inheritDoc} - */ - @Override public ResultScanner getScanner(Scan scan) throws IOException { final List ret = new ArrayList(); byte[] st = scan.getStartRow(); @@ -446,7 +429,7 @@ public class MockHTable implements HTableInterface { */ } if (filter.hasFilterRow() && !filteredOnRowKey) { - filter.filterRow(nkvs); + filter.filterRow(); } if (filter.filterRow() || filteredOnRowKey) { nkvs.clear(); @@ -535,6 +518,11 @@ public class MockHTable implements HTableInterface { return false; } + @Override + public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Put put) throws IOException { + return false; + } + /** * {@inheritDoc} */ @@ -555,7 +543,7 @@ public class MockHTable implements HTableInterface { continue; } for (KeyValue kv : delete.getFamilyMap().get(family)) { - if (kv.isDeleteFamily()) { + if (kv.isDelete()) { data.get(row).get(kv.getFamily()).clear(); } else { data.get(row).get(kv.getFamily()).remove(kv.getQualifier()); @@ -592,6 +580,11 @@ public class MockHTable implements HTableInterface { return false; } + @Override + public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, byte[] bytes3, Delete delete) throws IOException { + return false; + } + /** * {@inheritDoc} */ @@ -605,7 +598,7 @@ public class MockHTable implements HTableInterface { */ @Override public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { - return incrementColumnValue(row, family, qualifier, amount, true); + return incrementColumnValue(row, family, qualifier, amount, null); } @Override @@ -617,37 +610,6 @@ public class MockHTable implements HTableInterface { * {@inheritDoc} */ @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) throws IOException { - if (check(row, family, qualifier, null)) { - Put put = new Put(row); - put.add(family, qualifier, Bytes.toBytes(amount)); - put(put); - return amount; - } - long newValue = Bytes.toLong(data.get(row).get(family).get(qualifier).lastEntry().getValue()) + amount; - data.get(row).get(family).get(qualifier).put(System.currentTimeMillis(), Bytes.toBytes(newValue)); - return newValue; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isAutoFlush() { - return true; - } - - /** - * {@inheritDoc} - */ - @Override - public void flushCommits() throws IOException { - } - - /** - * {@inheritDoc} - */ - @Override public void close() throws IOException { } @@ -673,29 +635,6 @@ public class MockHTable implements HTableInterface { * {@inheritDoc} */ @Override - public void setAutoFlush(boolean autoFlush) { - throw new NotImplementedException(); - - } - - /** - * {@inheritDoc} - */ - @Override - public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { - throw new NotImplementedException(); - - } - - @Override - public void setAutoFlushTo(boolean autoFlush) { - throw new NotImplementedException(); - } - - /** - * {@inheritDoc} - */ - @Override public long getWriteBufferSize() { throw new NotImplementedException(); } diff --git a/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java b/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java index 5a48e83..d40bdf3 100644 --- a/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java +++ b/server/src/main/java/org/apache/kylin/rest/security/RealAclHBaseStorage.java @@ -21,7 +21,8 @@ package org.apache.kylin.rest.security; import java.io.IOException; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.rest.service.AclService; import org.apache.kylin.rest.service.UserService; @@ -56,11 +57,11 @@ public class RealAclHBaseStorage implements AclHBaseStorage { } @Override - public HTableInterface getTable(String tableName) throws IOException { + public Table getTable(String tableName) throws IOException { if (StringUtils.equals(tableName, aclTableName)) { - return HBaseConnection.get(hbaseUrl).getTable(aclTableName); + return HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(aclTableName)); } else if (StringUtils.equals(tableName, userTableName)) { - return HBaseConnection.get(hbaseUrl).getTable(userTableName); + return HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); } else { throw new IllegalStateException("getTable failed" + tableName); } diff --git a/server/src/main/java/org/apache/kylin/rest/service/AclService.java b/server/src/main/java/org/apache/kylin/rest/service/AclService.java index 58e093c..a03ff5e 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/AclService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/AclService.java @@ -33,7 +33,7 @@ import javax.annotation.PostConstruct; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -124,7 +124,7 @@ public class AclService implements MutableAclService { @Override public List findChildren(ObjectIdentity parentIdentity) { List oids = new ArrayList(); - HTableInterface htable = null; + Table htable = null; try { htable = aclHBaseStorage.getTable(aclTableName); @@ -173,7 +173,7 @@ public class AclService implements MutableAclService { @Override public Map readAclsById(List oids, List sids) throws NotFoundException { Map aclMaps = new HashMap(); - HTableInterface htable = null; + Table htable = null; Result result = null; try { htable = aclHBaseStorage.getTable(aclTableName); @@ -225,17 +225,16 @@ public class AclService implements MutableAclService { Authentication auth = SecurityContextHolder.getContext().getAuthentication(); PrincipalSid sid = new PrincipalSid(auth); - HTableInterface htable = null; + Table htable = null; try { htable = aclHBaseStorage.getTable(aclTableName); Put put = new Put(Bytes.toBytes(String.valueOf(objectIdentity.getIdentifier()))); - put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType())); - put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid))); - put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true)); + put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_TYPE_COLUMN), Bytes.toBytes(objectIdentity.getType())); + put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_OWNER_COLUMN), sidSerializer.serialize(new SidInfo(sid))); + put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_ENTRY_INHERIT_COLUMN), Bytes.toBytes(true)); htable.put(put); - htable.flushCommits(); logger.debug("ACL of " + objectIdentity + " created successfully."); } catch (IOException e) { @@ -249,7 +248,7 @@ public class AclService implements MutableAclService { @Override public void deleteAcl(ObjectIdentity objectIdentity, boolean deleteChildren) throws ChildrenExistException { - HTableInterface htable = null; + Table htable = null; try { htable = aclHBaseStorage.getTable(aclTableName); @@ -265,7 +264,6 @@ public class AclService implements MutableAclService { } htable.delete(delete); - htable.flushCommits(); logger.debug("ACL of " + objectIdentity + " deleted successfully."); } catch (IOException e) { @@ -283,7 +281,7 @@ public class AclService implements MutableAclService { throw e; } - HTableInterface htable = null; + Table htable = null; try { htable = aclHBaseStorage.getTable(aclTableName); @@ -294,17 +292,16 @@ public class AclService implements MutableAclService { Put put = new Put(Bytes.toBytes(String.valueOf(acl.getObjectIdentity().getIdentifier()))); if (null != acl.getParentAcl()) { - put.add(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity()))); + put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_INFO_FAMILY), Bytes.toBytes(ACL_INFO_FAMILY_PARENT_COLUMN), domainObjSerializer.serialize(new DomainObjectInfo(acl.getParentAcl().getObjectIdentity()))); } for (AccessControlEntry ace : acl.getEntries()) { AceInfo aceInfo = new AceInfo(ace); - put.add(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo)); + put.addColumn(Bytes.toBytes(AclHBaseStorage.ACL_ACES_FAMILY), Bytes.toBytes(aceInfo.getSidInfo().getSid()), aceSerializer.serialize(aceInfo)); } if (!put.isEmpty()) { htable.put(put); - htable.flushCommits(); logger.debug("ACL of " + acl.getObjectIdentity() + " updated successfully."); } diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java index 0c57d00..513311a 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -28,8 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.WeakHashMap; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -441,35 +440,24 @@ public class CubeService extends BasicService { if (htableInfoCache.containsKey(tableName)) { return htableInfoCache.get(tableName); } - - Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); - HTable table = null; + Connection conn = HBaseConnection.get(this.getConfig().getStorageUrl()); HBaseResponse hr = null; long tableSize = 0; int regionCount = 0; - try { - table = new HTable(hconf, tableName); - - HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(table); - Map sizeMap = cal.getRegionSizeMap(); - - for (long s : sizeMap.values()) { - tableSize += s; - } - - regionCount = sizeMap.size(); + HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn); + Map sizeMap = cal.getRegionSizeMap(); - // Set response. - hr = new HBaseResponse(); - hr.setTableSize(tableSize); - hr.setRegionCount(regionCount); - } finally { - if (null != table) { - table.close(); - } + for (long s : sizeMap.values()) { + tableSize += s; } + regionCount = sizeMap.size(); + + // Set response. + hr = new HBaseResponse(); + hr.setTableSize(tableSize); + hr.setRegionCount(regionCount); htableInfoCache.put(tableName, hr); return hr; diff --git a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java index bf371be..10769e3 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -44,8 +44,9 @@ import javax.sql.DataSource; import org.apache.calcite.avatica.ColumnMetaData.Rep; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.kylin.common.KylinConfig; @@ -128,14 +129,13 @@ public class QueryService extends BasicService { Query[] queryArray = new Query[queries.size()]; byte[] bytes = querySerializer.serialize(queries.toArray(queryArray)); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Put put = new Put(Bytes.toBytes(creator)); - put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); + put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); htable.put(put); - htable.flushCommits(); } finally { IOUtils.closeQuietly(htable); } @@ -161,14 +161,13 @@ public class QueryService extends BasicService { Query[] queryArray = new Query[queries.size()]; byte[] bytes = querySerializer.serialize(queries.toArray(queryArray)); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Put put = new Put(Bytes.toBytes(creator)); - put.add(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); + put.addColumn(Bytes.toBytes(USER_QUERY_FAMILY), Bytes.toBytes(USER_QUERY_COLUMN), bytes); htable.put(put); - htable.flushCommits(); } finally { IOUtils.closeQuietly(htable); } @@ -180,9 +179,9 @@ public class QueryService extends BasicService { } List queries = new ArrayList(); - HTableInterface htable = null; + Table htable = null; try { - htable = HBaseConnection.get(hbaseUrl).getTable(userTableName); + htable = HBaseConnection.get(hbaseUrl).getTable(TableName.valueOf(userTableName)); Get get = new Get(Bytes.toBytes(creator)); get.addFamily(Bytes.toBytes(USER_QUERY_FAMILY)); Result result = htable.get(get); diff --git a/server/src/main/java/org/apache/kylin/rest/service/UserService.java b/server/src/main/java/org/apache/kylin/rest/service/UserService.java index e4e2de3..624c49c 100644 --- a/server/src/main/java/org/apache/kylin/rest/service/UserService.java +++ b/server/src/main/java/org/apache/kylin/rest/service/UserService.java @@ -29,7 +29,7 @@ import javax.annotation.PostConstruct; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -69,7 +69,7 @@ public class UserService implements UserManager { @Override public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException { - HTableInterface htable = null; + Table htable = null; try { htable = aclHBaseStorage.getTable(userTableName); @@ -100,16 +100,15 @@ public class UserService implements UserManager { @Override public void updateUser(UserDetails user) { - HTableInterface htable = null; + Table htable = null; try { byte[] userAuthorities = serialize(user.getAuthorities()); htable = aclHBaseStorage.getTable(userTableName); Put put = new Put(Bytes.toBytes(user.getUsername())); - put.add(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), userAuthorities); + put.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN), userAuthorities); htable.put(put); - htable.flushCommits(); } catch (IOException e) { throw new RuntimeException(e.getMessage(), e); } finally { @@ -119,14 +118,13 @@ public class UserService implements UserManager { @Override public void deleteUser(String username) { - HTableInterface htable = null; + Table htable = null; try { htable = aclHBaseStorage.getTable(userTableName); Delete delete = new Delete(Bytes.toBytes(username)); htable.delete(delete); - htable.flushCommits(); } catch (IOException e) { throw new RuntimeException(e.getMessage(), e); } finally { @@ -141,7 +139,7 @@ public class UserService implements UserManager { @Override public boolean userExists(String username) { - HTableInterface htable = null; + Table htable = null; try { htable = aclHBaseStorage.getTable(userTableName); @@ -161,7 +159,7 @@ public class UserService implements UserManager { s.addColumn(Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_FAMILY), Bytes.toBytes(AclHBaseStorage.USER_AUTHORITY_COLUMN)); List authorities = new ArrayList(); - HTableInterface htable = null; + Table htable = null; ResultScanner scanner = null; try { htable = aclHBaseStorage.getTable(userTableName); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java index 661e8e4..7452ad0 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseConnection.java @@ -31,9 +31,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.StorageException; import org.apache.kylin.engine.mr.HadoopUtil; @@ -49,13 +49,13 @@ public class HBaseConnection { private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class); private static final Map ConfigCache = new ConcurrentHashMap(); - private static final Map ConnPool = new ConcurrentHashMap(); + private static final Map ConnPool = new ConcurrentHashMap(); static { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - for (HConnection conn : ConnPool.values()) { + for (Connection conn : ConnPool.values()) { try { conn.close(); } catch (IOException e) { @@ -121,9 +121,9 @@ public class HBaseConnection { // ============================================================================ - // returned HConnection can be shared by multiple threads and does not require close() + // returned Connection can be shared by multiple threads and does not require close() @SuppressWarnings("resource") - public static HConnection get(String url) { + public static Connection get(String url) { // find configuration Configuration conf = ConfigCache.get(url); if (conf == null) { @@ -131,13 +131,13 @@ public class HBaseConnection { ConfigCache.put(url, conf); } - HConnection connection = ConnPool.get(url); + Connection connection = ConnPool.get(url); try { while (true) { // I don't use DCL since recreate a connection is not a big issue. if (connection == null || connection.isClosed()) { logger.info("connection is null or closed, creating a new one"); - connection = HConnectionManager.createConnection(conf); + connection = ConnectionFactory.createConnection(conf); ConnPool.put(url, connection); } @@ -156,8 +156,8 @@ public class HBaseConnection { return connection; } - public static boolean tableExists(HConnection conn, String tableName) throws IOException { - HBaseAdmin hbase = new HBaseAdmin(conn); + public static boolean tableExists(Connection conn, String tableName) throws IOException { + Admin hbase = conn.getAdmin(); try { return hbase.tableExists(TableName.valueOf(tableName)); } finally { @@ -177,8 +177,8 @@ public class HBaseConnection { deleteTable(HBaseConnection.get(hbaseUrl), tableName); } - public static void createHTableIfNeeded(HConnection conn, String tableName, String... families) throws IOException { - HBaseAdmin hbase = new HBaseAdmin(conn); + public static void createHTableIfNeeded(Connection conn, String tableName, String... families) throws IOException { + Admin hbase = conn.getAdmin(); try { if (tableExists(conn, tableName)) { @@ -205,8 +205,8 @@ public class HBaseConnection { } } - public static void deleteTable(HConnection conn, String tableName) throws IOException { - HBaseAdmin hbase = new HBaseAdmin(conn); + public static void deleteTable(Connection conn, String tableName) throws IOException { + Admin hbase = conn.getAdmin(); try { if (!tableExists(conn, tableName)) { @@ -216,10 +216,10 @@ public class HBaseConnection { logger.debug("delete HTable '" + tableName + "'"); - if (hbase.isTableEnabled(tableName)) { - hbase.disableTable(tableName); + if (hbase.isTableEnabled(TableName.valueOf(tableName))) { + hbase.disableTable(TableName.valueOf(tableName)); } - hbase.deleteTable(tableName); + hbase.deleteTable(TableName.valueOf(tableName)); logger.debug("HTable '" + tableName + "' deleted"); } finally { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java index 6d77240..9420d41 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java @@ -33,10 +33,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -83,7 +84,7 @@ public class HBaseResourceStore extends ResourceStore { // final Map tableNameMap; // path prefix ==> HBase table name - private HConnection getConnection() throws IOException { + private Connection getConnection() throws IOException { return HBaseConnection.get(hbaseUrl); } @@ -126,7 +127,7 @@ public class HBaseResourceStore extends ResourceStore { ArrayList result = new ArrayList(); - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); Scan scan = new Scan(startRow, endRow); scan.setFilter(new KeyOnlyFilter()); try { @@ -171,7 +172,7 @@ public class HBaseResourceStore extends ResourceStore { } tuneScanParameters(scan); - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); List result = Lists.newArrayList(); try { ResultScanner scanner = table.getScanner(scan); @@ -253,13 +254,12 @@ public class HBaseResourceStore extends ResourceStore { IOUtils.copy(content, bout); bout.close(); - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { byte[] row = Bytes.toBytes(resPath); Put put = buildPut(resPath, ts, row, bout.toByteArray(), table); table.put(put); - table.flushCommits(); } finally { IOUtils.closeQuietly(table); } @@ -267,7 +267,7 @@ public class HBaseResourceStore extends ResourceStore { @Override protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { byte[] row = Bytes.toBytes(resPath); byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS); @@ -280,8 +280,6 @@ public class HBaseResourceStore extends ResourceStore { throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + oldTS + ", but it is " + real); } - table.flushCommits(); - return newTS; } finally { IOUtils.closeQuietly(table); @@ -290,11 +288,10 @@ public class HBaseResourceStore extends ResourceStore { @Override protected void deleteResourceImpl(String resPath) throws IOException { - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { Delete del = new Delete(Bytes.toBytes(resPath)); table.delete(del); - table.flushCommits(); } finally { IOUtils.closeQuietly(table); } @@ -319,7 +316,7 @@ public class HBaseResourceStore extends ResourceStore { get.addColumn(B_FAMILY, B_COLUMN_TS); } - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { Result result = table.get(get); boolean exists = result != null && (!result.isEmpty() || (result.getExists() != null && result.getExists())); @@ -335,7 +332,7 @@ public class HBaseResourceStore extends ResourceStore { return endRow; } - private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException { + private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException { Path redirectPath = bigCellHDFSPath(resPath); Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); @@ -361,7 +358,7 @@ public class HBaseResourceStore extends ResourceStore { return redirectPath; } - private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException { + private Put buildPut(String resPath, long ts, byte[] row, byte[] content, Table table) throws IOException { int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize(); if (content.length > kvSizeLimit) { writeLargeCellToHdfs(resPath, content, table); @@ -369,8 +366,8 @@ public class HBaseResourceStore extends ResourceStore { } Put put = new Put(row); - put.add(B_FAMILY, B_COLUMN, content); - put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts)); + put.addColumn(B_FAMILY, B_COLUMN, content); + put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts)); return put; } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java index 909de39..2bc8a3b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java @@ -25,8 +25,9 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Set; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -70,7 +71,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { protected final List rowValueDecoders; private final StorageContext context; private final String tableName; - private final HTableInterface table; + private final Table table; protected CubeTupleConverter tupleConverter; protected final Iterator rangeIterator; @@ -88,7 +89,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { private int advMeasureRowsRemaining; private int advMeasureRowIndex; - public CubeSegmentTupleIterator(CubeSegment cubeSeg, List keyRanges, HConnection conn, // + public CubeSegmentTupleIterator(CubeSegment cubeSeg, List keyRanges, Connection conn, // Set dimensions, TupleFilter filter, Set groupBy, // List rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) { this.cubeSeg = cubeSeg; @@ -108,7 +109,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator { this.rangeIterator = keyRanges.iterator(); try { - this.table = conn.getTable(tableName); + this.table = conn.getTable(TableName.valueOf(tableName)); } catch (Throwable t) { throw new StorageException("Error when open connection to table " + tableName, t); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java index 3d7f620..952e9af 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java @@ -33,7 +33,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Dictionary; @@ -148,7 +148,7 @@ public class CubeStorageQuery implements ICachableStorageQuery { setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial setLimit(filter, context); - HConnection conn = HBaseConnection.get(context.getConnUrl()); + Connection conn = HBaseConnection.get(context.getConnUrl()); // notice we're passing filterD down to storage instead of flatFilter return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/HBaseClientKVIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/HBaseClientKVIterator.java index 8aace22..54c399f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/HBaseClientKVIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/HBaseClientKVIterator.java @@ -24,8 +24,9 @@ import java.util.Iterator; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -40,14 +41,14 @@ public class HBaseClientKVIterator implements Iterable, Closeable { byte[] family; - HTableInterface table; + Table table; ResultScanner scanner; Iterator iterator; - public HBaseClientKVIterator(HConnection hconn, String tableName, byte[] family) throws IOException { + public HBaseClientKVIterator(Connection hconn, String tableName, byte[] family) throws IOException { this.family = family; - this.table = hconn.getTable(tableName); + this.table = hconn.getTable(TableName.valueOf(tableName)); this.scanner = table.getScanner(family); this.iterator = scanner.iterator(); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java index 6342c5c..25c2dda 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java @@ -23,9 +23,11 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; /** * @author yangli9 @@ -50,7 +52,7 @@ public class RegionScannerAdapter implements RegionScanner { } @Override - public boolean next(List result, int limit) throws IOException { + public boolean next(List result, ScannerContext scannerContext) throws IOException { return next(result); } @@ -60,7 +62,7 @@ public class RegionScannerAdapter implements RegionScanner { } @Override - public boolean nextRaw(List result, int limit) throws IOException { + public boolean nextRaw(List result, ScannerContext scannerContext) throws IOException { return next(result); } @@ -94,4 +96,9 @@ public class RegionScannerAdapter implements RegionScanner { return Long.MAX_VALUE; } + @Override + public int getBatch() { + return -1; + } + } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java index e8dd5b9..d033c77 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java @@ -25,7 +25,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.metadata.filter.TupleFilter; @@ -57,7 +57,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator { private int scanCount; private ITuple next; - public SerializedHBaseTupleIterator(HConnection conn, List segmentKeyRanges, CubeInstance cube, // + public SerializedHBaseTupleIterator(Connection conn, List segmentKeyRanges, CubeInstance cube, // Set dimensions, TupleFilter filter, Set groupBy, List rowValueDecoders, // StorageContext context, TupleInfo returnTupleInfo) { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java index c7b650a..8dba1b1 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; @@ -99,7 +99,7 @@ public class AggregateRegionObserver extends BaseRegionObserver { // start/end region operation & sync on scanner is suggested by the // javadoc of RegionScanner.nextRaw() // FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb - HRegion region = ctxt.getEnvironment().getRegion(); + Region region = ctxt.getEnvironment().getRegion(); region.startRegionOperation(); try { synchronized (innerScanner) { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java index d8b61b3..635a32f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java @@ -24,7 +24,9 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey; import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; @@ -116,8 +118,8 @@ public class AggregationScanner implements RegionScanner { } @Override - public boolean next(List result, int limit) throws IOException { - return outerScanner.next(result, limit); + public boolean next(List result, ScannerContext scannerContext) throws IOException { + return outerScanner.next(result, scannerContext); } @Override @@ -126,8 +128,8 @@ public class AggregationScanner implements RegionScanner { } @Override - public boolean nextRaw(List result, int limit) throws IOException { - return outerScanner.nextRaw(result, limit); + public boolean nextRaw(List result, ScannerContext scannerContext) throws IOException { + return outerScanner.nextRaw(result, scannerContext); } @Override @@ -160,6 +162,11 @@ public class AggregationScanner implements RegionScanner { return outerScanner.getMvccReadPoint(); } + @Override + public int getBatch() { + return outerScanner.getBatch(); + } + private static class Stats { long inputRows = 0; long inputBytes = 0; diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java index 8404262..1809a44 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java @@ -24,12 +24,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey; import org.apache.kylin.storage.hbase.common.coprocessor.AggregationCache; @@ -112,7 +110,7 @@ public class ObserverAggregationCache extends AggregationCache { } @Override - public boolean next(List result, int limit) throws IOException { + public boolean next(List result, ScannerContext scannerContext) throws IOException { return next(result); } @@ -122,7 +120,7 @@ public class ObserverAggregationCache extends AggregationCache { } @Override - public boolean nextRaw(List result, int limit) throws IOException { + public boolean nextRaw(List result, ScannerContext scannerContext) throws IOException { return next(result); } @@ -161,6 +159,11 @@ public class ObserverAggregationCache extends AggregationCache { // AggregateRegionObserver.LOG.info("Kylin Scanner getMvccReadPoint()"); return Long.MAX_VALUE; } + + @Override + public int getBatch() { + return innerScanner.getBatch(); + } } } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java index b8ac3d5..7435d07 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java @@ -23,7 +23,7 @@ import java.util.Collection; import java.util.Map; import java.util.Set; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -60,7 +60,7 @@ public class ObserverEnabler { static final Map CUBE_OVERRIDES = Maps.newConcurrentMap(); public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment segment, Cuboid cuboid, TupleFilter tupleFiler, // - Collection groupBy, Collection rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) throws IOException { + Collection groupBy, Collection rowValueDecoders, StorageContext context, Table table, Scan scan) throws IOException { if (context.isCoprocessorEnabled() == false) { return table.getScanner(scan); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index e6f9ac1..902872f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -38,8 +38,9 @@ import javax.annotation.Nullable; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; @@ -258,7 +259,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { final ImmutableBitSet selectedColBlocks = scanRequests.get(0).getSelectedColBlocks().set(0); // globally shared connection, does not require close - final HConnection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); + final Connection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); final List hbaseColumnsToGTIntList = Lists.newArrayList(); List> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks); @@ -310,7 +311,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { Map results; try { - results = getResults(builder.build(), conn.getTable(cubeSeg.getStorageLocationIdentifier()), epRange.getFirst(), epRange.getSecond()); + results = getResults(builder.build(), conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier())), epRange.getFirst(), epRange.getSecond()); } catch (Throwable throwable) { throw new RuntimeException("Error when visiting cubes by endpoint:", throwable); } @@ -345,7 +346,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { } - private Map getResults(final CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] startKey, byte[] endKey) throws Throwable { + private Map getResults(final CubeVisitProtos.CubeVisitRequest request, Table table, byte[] startKey, byte[] endKey) throws Throwable { Map results = table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, endKey, new Batch.Call() { public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService rowsService) throws IOException { ServerRpcController controller = new ServerRpcController(); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index ef53cb7..2b1b512 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -25,8 +25,9 @@ import java.util.List; import javax.annotation.Nullable; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -135,8 +136,8 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { // primary key (also the 0th column block) is always selected final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0); // globally shared connection, does not require close - HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); - final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier()); + Connection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); + final Table hbaseTable = hbaseConn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier())); List rawScans = preparedHBaseScans(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks); List> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java index 7ac4be1..dddb2ad 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java @@ -129,7 +129,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement if (shardLength == 0) { return; } - byte[] regionStartKey = ArrayUtils.isEmpty(region.getStartKey()) ? new byte[shardLength] : region.getStartKey(); + byte[] regionStartKey = ArrayUtils.isEmpty(region.getRegionInfo().getStartKey()) ? new byte[shardLength] : region.getRegionInfo().getStartKey(); Bytes.putBytes(rawScan.startKey, 0, regionStartKey, 0, shardLength); Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength); } @@ -150,7 +150,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement try { this.serviceStartTime = System.currentTimeMillis(); - region = env.getRegion(); + region = (HRegion)env.getRegion(); region.startRegionOperation(); GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()))); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java index 9c96f21..a277eeb 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java @@ -24,7 +24,8 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.security.User; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.util.IIDeployCoprocessorCLI; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IIManager; @@ -47,7 +49,7 @@ public class IICreateHTableJob extends AbstractHadoopJob { @Override public int run(String[] args) throws Exception { Options options = new Options(); - HBaseAdmin admin = null; + Admin admin = null; try { options.addOption(OPTION_II_NAME); options.addOption(OPTION_HTABLE_NAME); @@ -63,10 +65,11 @@ public class IICreateHTableJob extends AbstractHadoopJob { Configuration conf = HBaseConfiguration.create(getConf()); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); // check if the table already exists - admin = new HBaseAdmin(conf); - if (admin.tableExists(tableName)) { - if (admin.isTableEnabled(tableName)) { + admin = conn.getAdmin(); + if (admin.tableExists(TableName.valueOf(tableName))) { + if (admin.isTableEnabled(TableName.valueOf(tableName))) { logger.info("Table " + tableName + " already exists and is enabled, no need to create."); return 0; } else { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java index 5afb62d..dd65f9a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/InvertedIndexStorageQuery.java @@ -20,7 +20,7 @@ package org.apache.kylin.storage.hbase.ii; import java.util.ArrayList; -import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.invertedindex.IIInstance; import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.metadata.realization.SQLDigest; @@ -55,8 +55,8 @@ public class InvertedIndexStorageQuery implements ICachableStorageQuery { public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { String tableName = seg.getStorageLocationIdentifier(); - //HConnection is cached, so need not be closed - HConnection conn = HBaseConnection.get(context.getConnUrl()); + //Connection is cached, so need not be closed + Connection conn = HBaseConnection.get(context.getConnUrl()); try { dataIterator = new EndpointTupleIterator(seg, sqlDigest.filter, sqlDigest.groupbyColumns, new ArrayList<>(sqlDigest.aggregations), context, conn, returnTupleInfo); return dataIterator; diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java index 4ec421b..8da8e59 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java @@ -31,8 +31,9 @@ import javax.annotation.Nullable; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.SerializationUtils; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; @@ -93,16 +94,16 @@ public class EndpointTupleIterator implements ITupleIterator { private Iterator> regionResponsesIterator = null; private ITupleIterator tupleIterator = null; - private HTableInterface table = null; + private Table table = null; private TblColRef partitionCol; private long lastDataTime = -1; private int rowsInAllMetric = 0; - public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection groupBy, List measures, StorageContext context, HConnection conn, TupleInfo returnTupleInfo) throws Throwable { + public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection groupBy, List measures, StorageContext context, Connection conn, TupleInfo returnTupleInfo) throws Throwable { String tableName = segment.getStorageLocationIdentifier(); - table = conn.getTable(tableName); + table = conn.getTable(TableName.valueOf(tableName)); factTableName = segment.getIIDesc().getFactTableName(); if (rootFilter == null) { @@ -290,7 +291,7 @@ public class EndpointTupleIterator implements ITupleIterator { return request; } - private Collection getResults(final IIProtos.IIRequest request, HTableInterface table) throws Throwable { + private Collection getResults(final IIProtos.IIRequest request, Table table) throws Throwable { Map results = table.coprocessorService(IIProtos.RowsService.class, null, null, new Batch.Call() { public IIProtos.IIResponse call(IIProtos.RowsService rowsService) throws IOException { ServerRpcController controller = new ServerRpcController(); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java index af7b993..4724188 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/IIEndpoint.java @@ -89,7 +89,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop if (request.hasTsRange()) { Range tsRange = (Range) SerializationUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getTsRange())); - byte[] regionStartKey = region.getStartKey(); + byte[] regionStartKey = region.getRegionInfo().getStartKey(); if (!ArrayUtils.isEmpty(regionStartKey)) { shard = BytesUtil.readUnsigned(regionStartKey, 0, IIKeyValueCodec.SHARD_LEN); } else { @@ -143,7 +143,7 @@ public class IIEndpoint extends IIProtos.RowsService implements Coprocessor, Cop HRegion region = null; try { - region = env.getRegion(); + region = (HRegion)env.getRegion(); region.startRegionOperation(); innerScanner = region.getScanner(prepareScan(request, region)); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java index f2aba0a..be32903 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHTableUtil.java @@ -26,7 +26,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; @@ -70,7 +72,8 @@ public class CubeHTableUtil { tableDesc.setValue(IRealizationConstants.HTableUser, cubeInstance.getOwner()); Configuration conf = HBaseConfiguration.create(); - HBaseAdmin admin = new HBaseAdmin(conf); + Connection conn = ConnectionFactory.createConnection(HBaseConfiguration.create()); + Admin admin = conn.getAdmin(); try { if (User.isHBaseSecurityEnabled(conf)) { @@ -117,7 +120,7 @@ public class CubeHTableUtil { tableDesc.addFamily(cf); } - if (admin.tableExists(tableName)) { + if (admin.tableExists(TableName.valueOf(tableName))) { // admin.disableTable(tableName); // admin.deleteTable(tableName); throw new RuntimeException("HBase table " + tableName + " exists!"); @@ -126,7 +129,7 @@ public class CubeHTableUtil { DeployCoprocessorCLI.deployCoprocessor(tableDesc); admin.createTable(tableDesc, splitKeys); - Preconditions.checkArgument(admin.isTableAvailable(tableName), "table " + tableName + " created, but is not available due to some reasons"); + Preconditions.checkArgument(admin.isTableAvailable(TableName.valueOf(tableName)), "table " + tableName + " created, but is not available due to some reasons"); logger.info("create hbase table " + tableName + " done."); } catch (Exception e) { logger.error("Failed to create HTable", e); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java index 735f967..9dc9715 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/DeprecatedGCStep.java @@ -27,17 +27,18 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.engine.mr.HadoopUtil; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.apache.kylin.storage.hbase.util.HiveCmdBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,19 +100,21 @@ public class DeprecatedGCStep extends AbstractExecutable { List oldTables = getOldHTables(); if (oldTables != null && oldTables.size() > 0) { String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin admin = null; + Admin admin = null; try { - admin = new HBaseAdmin(conf); + + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + admin = conn.getAdmin(); + for (String table : oldTables) { - if (admin.tableExists(table)) { - HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table)); + if (admin.tableExists(TableName.valueOf(table))) { + HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(table)); String host = tableDescriptor.getValue(IRealizationConstants.HTableTag); if (metadataUrlPrefix.equalsIgnoreCase(host)) { - if (admin.isTableEnabled(table)) { - admin.disableTable(table); + if (admin.isTableEnabled(TableName.valueOf(table))) { + admin.disableTable(TableName.valueOf(table)); } - admin.deleteTable(table); + admin.deleteTable(TableName.valueOf(table)); logger.debug("Dropped HBase table " + table); output.append("Dropped HBase table " + table + " \n"); } else { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java index ddc868d..5063e00 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java @@ -36,7 +36,7 @@ package org.apache.kylin.storage.hbase.steps; import com.google.common.collect.Lists; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.cube.CubeSegment; @@ -63,7 +63,7 @@ public class HBaseCuboidWriter implements ICuboidWriter { private final List keyValueCreators; private final int nColumns; - private final HTableInterface hTable; + private final Table hTable; private final CubeDesc cubeDesc; private final CubeSegment cubeSegment; private final Object[] measureValues; @@ -72,7 +72,7 @@ public class HBaseCuboidWriter implements ICuboidWriter { private AbstractRowKeyEncoder rowKeyEncoder; private byte[] keybuf; - public HBaseCuboidWriter(CubeSegment segment, HTableInterface hTable) { + public HBaseCuboidWriter(CubeSegment segment, Table hTable) { this.keyValueCreators = Lists.newArrayList(); this.cubeSegment = segment; this.cubeDesc = cubeSegment.getCubeDesc(); @@ -131,7 +131,6 @@ public class HBaseCuboidWriter implements ICuboidWriter { long t = System.currentTimeMillis(); if (hTable != null) { hTable.put(puts); - hTable.flushCommits(); } logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis() - t) + "ms"); puts.clear(); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java index fa62a62..cf5c9d4 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java @@ -28,7 +28,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; import org.apache.kylin.common.persistence.ResourceStore; @@ -55,7 +56,7 @@ public class HBaseStreamingOutput implements IStreamingOutput { try { CubeSegment cubeSegment = (CubeSegment) buildable; - final HTableInterface hTable; + final Table hTable; hTable = createHTable(cubeSegment); List cuboidWriters = Lists.newArrayList(); cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable)); @@ -87,10 +88,10 @@ public class HBaseStreamingOutput implements IStreamingOutput { } } - private HTableInterface createHTable(final CubeSegment cubeSegment) throws IOException { + private Table createHTable(final CubeSegment cubeSegment) throws IOException { final String hTableName = cubeSegment.getStorageLocationIdentifier(); CubeHTableUtil.createHTable(cubeSegment, null); - final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName); + final Table hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(TableName.valueOf(hTableName)); logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!"); return hTable; } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java index a4a8a35..dfde2dd 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java @@ -24,17 +24,17 @@ import java.util.Collections; import java.util.List; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,19 +69,20 @@ public class MergeGCStep extends AbstractExecutable { List oldTables = getOldHTables(); if (oldTables != null && oldTables.size() > 0) { String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin admin = null; + Admin admin = null; try { - admin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + admin = conn.getAdmin(); + for (String table : oldTables) { - if (admin.tableExists(table)) { - HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table)); + if (admin.tableExists(TableName.valueOf(table))) { + HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf((table))); String host = tableDescriptor.getValue(IRealizationConstants.HTableTag); if (metadataUrlPrefix.equalsIgnoreCase(host)) { - if (admin.isTableEnabled(table)) { - admin.disableTable(table); + if (admin.isTableEnabled(TableName.valueOf(table))) { + admin.disableTable(TableName.valueOf(table)); } - admin.deleteTable(table); + admin.deleteTable(TableName.valueOf(table)); logger.debug("Dropped htable: " + table); output.append("HBase table " + table + " is dropped. \n"); } else { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java index 957e04b..51ecc8f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CleanHtableCLI.java @@ -21,13 +21,15 @@ package org.apache.kylin.storage.hbase.util; import java.io.IOException; import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,8 +54,9 @@ public class CleanHtableCLI extends AbstractHadoopJob { } private void clean() throws IOException { - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + Admin hbaseAdmin = conn.getAdmin(); for (HTableDescriptor descriptor : hbaseAdmin.listTables()) { String name = descriptor.getNameAsString().toLowerCase(); @@ -64,7 +67,7 @@ public class CleanHtableCLI extends AbstractHadoopJob { System.out.println(); descriptor.setValue(IRealizationConstants.HTableOwner, "DL-eBay-Kylin@ebay.com"); - hbaseAdmin.modifyTable(descriptor.getNameAsString(), descriptor); + hbaseAdmin.modifyTable(TableName.valueOf(descriptor.getNameAsString()), descriptor); } } hbaseAdmin.close(); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java index 09aab48..f15100d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java @@ -73,7 +73,7 @@ public class CubeMigrationCLI { private static ResourceStore srcStore; private static ResourceStore dstStore; private static FileSystem hdfsFS; - private static HBaseAdmin hbaseAdmin; + private static Admin hbaseAdmin; public static final String ACL_INFO_FAMILY = "i"; private static final String ACL_TABLE_NAME = "_acl"; @@ -117,8 +117,8 @@ public class CubeMigrationCLI { checkAndGetHbaseUrl(); - Configuration conf = HBaseConfiguration.create(); - hbaseAdmin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(srcConfig.getStorageUrl()); + hbaseAdmin = conn.getAdmin(); hdfsFS = FileSystem.get(new Configuration()); @@ -143,6 +143,8 @@ public class CubeMigrationCLI { } checkMigrationSuccess(dstConfig, cubeName, true); + + IOUtils.closeQuietly(hbaseAdmin); } public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { @@ -311,10 +313,10 @@ public class CubeMigrationCLI { case CHANGE_HTABLE_HOST: { String tableName = (String) opt.params[0]; HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); logger.info("CHANGE_HTABLE_HOST is completed"); break; } @@ -425,11 +427,11 @@ public class CubeMigrationCLI { Serializer projectSerializer = new JsonSerializer(ProjectInstance.class); ProjectInstance project = dstStore.getResource(projectResPath, ProjectInstance.class, projectSerializer); String projUUID = project.getUuid(); - HTableInterface srcAclHtable = null; - HTableInterface destAclHtable = null; + Table srcAclHtable = null; + Table destAclHtable = null; try { - srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME); - destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME); + srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); + destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); // cube acl Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId))); @@ -445,11 +447,10 @@ public class CubeMigrationCLI { value = Bytes.toBytes(valueString); } Put put = new Put(Bytes.toBytes(cubeId)); - put.add(family, column, value); + put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell)); destAclHtable.put(put); } } - destAclHtable.flushCommits(); } finally { IOUtils.closeQuietly(srcAclHtable); IOUtils.closeQuietly(destAclHtable); @@ -476,10 +477,10 @@ public class CubeMigrationCLI { case CHANGE_HTABLE_HOST: { String tableName = (String) opt.params[0]; HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); break; } case COPY_FILE_IN_META: { @@ -509,13 +510,12 @@ public class CubeMigrationCLI { case COPY_ACL: { String cubeId = (String) opt.params[0]; String modelId = (String) opt.params[1]; - HTableInterface destAclHtable = null; + Table destAclHtable = null; try { - destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME); + destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME)); destAclHtable.delete(new Delete(Bytes.toBytes(cubeId))); destAclHtable.delete(new Delete(Bytes.toBytes(modelId))); - destAclHtable.flushCommits(); } finally { IOUtils.closeQuietly(destAclHtable); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java index d3a85f0..b735864 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCheckCLI.java @@ -27,13 +27,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.OptionsHelper; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +63,7 @@ public class CubeMigrationCheckCLI { private KylinConfig dstCfg; - private HBaseAdmin hbaseAdmin; + private Admin hbaseAdmin; private List issueExistHTables; private List inconsistentHTables; @@ -128,9 +131,8 @@ public class CubeMigrationCheckCLI { this.dstCfg = kylinConfig; this.ifFix = isFix; - Configuration conf = HBaseConfiguration.create(); - hbaseAdmin = new HBaseAdmin(conf); - + Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl()); + hbaseAdmin = conn.getAdmin(); issueExistHTables = Lists.newArrayList(); inconsistentHTables = Lists.newArrayList(); } @@ -187,10 +189,10 @@ public class CubeMigrationCheckCLI { String[] sepNameList = segFullName.split(","); HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(sepNameList[0])); logger.info("Change the host of htable "+sepNameList[0]+"belonging to cube "+sepNameList[1]+" from "+desc.getValue(IRealizationConstants.HTableTag)+" to "+dstCfg.getMetadataUrlPrefix()); - hbaseAdmin.disableTable(sepNameList[0]); + hbaseAdmin.disableTable(TableName.valueOf(sepNameList[0])); desc.setValue(IRealizationConstants.HTableTag, dstCfg.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(sepNameList[0], desc); - hbaseAdmin.enableTable(sepNameList[0]); + hbaseAdmin.modifyTable(TableName.valueOf(sepNameList[0]), desc); + hbaseAdmin.enableTable(TableName.valueOf(sepNameList[0])); } }else{ logger.info("------ Inconsistent HTables Needed To Be Fixed ------"); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java index 5bca721..749f8aa 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java @@ -40,7 +40,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; @@ -73,7 +75,8 @@ public class DeployCoprocessorCLI { KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); - HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf); + Connection conn = HBaseConnection.get(kylinConfig.getStorageUrl()); + Admin hbaseAdmin = conn.getAdmin(); String localCoprocessorJar = new File(args[0]).getAbsolutePath(); logger.info("Identify coprocessor jar " + localCoprocessorJar); @@ -154,10 +157,10 @@ public class DeployCoprocessorCLI { public static void deployCoprocessor(HTableDescriptor tableDesc) { try { initHTableCoprocessor(tableDesc); - logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor."); + logger.info("hbase table " + tableDesc.getTableName() + " deployed with coprocessor."); } catch (Exception ex) { - logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex); + logger.error("Error deploying coprocessor on " + tableDesc.getTableName(), ex); logger.error("Will try creating the table without coprocessor."); } } @@ -180,9 +183,9 @@ public class DeployCoprocessorCLI { desc.addCoprocessor(CubeObserverClass, hdfsCoprocessorJar, 1002, null); } - public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException { + public static void resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException { logger.info("Disable " + tableName); - hbaseAdmin.disableTable(tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); logger.info("Unset coprocessor on " + tableName); HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); @@ -203,13 +206,13 @@ public class DeployCoprocessorCLI { desc.removeCoprocessor(IIEndpointClassOld); } addCoprocessorOnHTable(desc, hdfsCoprocessorJar); - hbaseAdmin.modifyTable(tableName, desc); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); logger.info("Enable " + tableName); - hbaseAdmin.enableTable(tableName); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); } - private static List resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List tableNames) throws IOException { + private static List resetCoprocessorOnHTables(Admin hbaseAdmin, Path hdfsCoprocessorJar, List tableNames) throws IOException { List processed = new ArrayList(); for (String tableName : tableNames) { @@ -320,7 +323,7 @@ public class DeployCoprocessorCLI { return coprocessorDir; } - private static Set getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List tableNames) throws IOException { + private static Set getCoprocessorJarPaths(Admin hbaseAdmin, List tableNames) throws IOException { HashSet result = new HashSet(); for (String tableName : tableNames) { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java index c55bde4..ad5b647 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ExtendCubeToHybridCLI.java @@ -26,8 +26,9 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.kylin.common.KylinConfig; @@ -232,9 +233,9 @@ public class ExtendCubeToHybridCLI { Serializer projectSerializer = new JsonSerializer(ProjectInstance.class); ProjectInstance project = store.getResource(projectResPath, ProjectInstance.class, projectSerializer); String projUUID = project.getUuid(); - HTableInterface aclHtable = null; + Table aclHtable = null; try { - aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(kylinConfig.getMetadataUrlPrefix() + "_acl"); + aclHtable = HBaseConnection.get(kylinConfig.getStorageUrl()).getTable(TableName.valueOf(kylinConfig.getMetadataUrlPrefix() + "_acl")); // cube acl Result result = aclHtable.get(new Get(Bytes.toBytes(origCubeId))); @@ -254,7 +255,6 @@ public class ExtendCubeToHybridCLI { aclHtable.put(put); } } - aclHtable.flushCommits(); } finally { IOUtils.closeQuietly(aclHtable); } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java index 8a88b6d..760ed3a 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java @@ -28,9 +28,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -75,7 +75,7 @@ public class GridTableHBaseBenchmark { System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio); String hbaseUrl = "hbase"; // use hbase-site.xml on classpath - HConnection conn = HBaseConnection.get(hbaseUrl); + Connection conn = HBaseConnection.get(hbaseUrl); createHTableIfNeeded(conn, TEST_TABLE); prepareData(conn); @@ -91,10 +91,10 @@ public class GridTableHBaseBenchmark { } - private static void testColumnScan(HConnection conn, List> colScans) throws IOException { + private static void testColumnScan(Connection conn, List> colScans) throws IOException { Stats stats = new Stats("COLUMN_SCAN"); - HTableInterface table = conn.getTable(TEST_TABLE); + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { stats.markStart(); @@ -122,20 +122,20 @@ public class GridTableHBaseBenchmark { } } - private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] hits) throws IOException { + private static void testRowScanNoIndexFullScan(Connection conn, boolean[] hits) throws IOException { fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL")); } - private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] hits) throws IOException { + private static void testRowScanNoIndexSkipScan(Connection conn, boolean[] hits) throws IOException { jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP")); } - private static void testRowScanWithIndex(HConnection conn, boolean[] hits) throws IOException { + private static void testRowScanWithIndex(Connection conn, boolean[] hits) throws IOException { jumpScan(conn, hits, new Stats("ROW_SCAN_IDX")); } - private static void fullScan(HConnection conn, boolean[] hits, Stats stats) throws IOException { - HTableInterface table = conn.getTable(TEST_TABLE); + private static void fullScan(Connection conn, boolean[] hits, Stats stats) throws IOException { + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { stats.markStart(); @@ -156,11 +156,11 @@ public class GridTableHBaseBenchmark { } } - private static void jumpScan(HConnection conn, boolean[] hits, Stats stats) throws IOException { + private static void jumpScan(Connection conn, boolean[] hits, Stats stats) throws IOException { final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience - HTableInterface table = conn.getTable(TEST_TABLE); + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { stats.markStart(); @@ -204,8 +204,8 @@ public class GridTableHBaseBenchmark { } } - private static void prepareData(HConnection conn) throws IOException { - HTableInterface table = conn.getTable(TEST_TABLE); + private static void prepareData(Connection conn) throws IOException { + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { // check how many rows existing @@ -258,8 +258,8 @@ public class GridTableHBaseBenchmark { return bytes; } - private static void createHTableIfNeeded(HConnection conn, String tableName) throws IOException { - HBaseAdmin hbase = new HBaseAdmin(conn); + private static void createHTableIfNeeded(Connection conn, String tableName) throws IOException { + Admin hbase = conn.getAdmin(); try { boolean tableExist = false; diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java index 621909a..f5b315d 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseClean.java @@ -29,10 +29,15 @@ import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,8 +83,8 @@ public class HBaseClean extends AbstractHadoopJob { private void cleanUp() { try { // get all kylin hbase tables - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + Admin hbaseAdmin = conn.getAdmin(); String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); List allTablesNeedToBeDropped = Lists.newArrayList(); @@ -94,12 +99,12 @@ public class HBaseClean extends AbstractHadoopJob { // drop tables for (String htableName : allTablesNeedToBeDropped) { logger.info("Deleting HBase table " + htableName); - if (hbaseAdmin.tableExists(htableName)) { - if (hbaseAdmin.isTableEnabled(htableName)) { - hbaseAdmin.disableTable(htableName); + if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) { + if (hbaseAdmin.isTableEnabled(TableName.valueOf(htableName))) { + hbaseAdmin.disableTable(TableName.valueOf(htableName)); } - hbaseAdmin.deleteTable(htableName); + hbaseAdmin.deleteTable(TableName.valueOf(htableName)); logger.info("Deleted HBase table " + htableName); } else { logger.info("HBase table" + htableName + " does not exist"); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java index 346c3a2..58aa8fd 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseRegionSizeCalculator.java @@ -20,22 +20,11 @@ package org.apache.kylin.storage.hbase.util; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; +import java.util.*; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.RegionLoad; -import org.apache.hadoop.hbase.ServerLoad; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kylin.common.util.Pair; import org.slf4j.Logger; @@ -57,30 +46,31 @@ public class HBaseRegionSizeCalculator { /** * Computes size of each region for table and given column families. * */ - public HBaseRegionSizeCalculator(HTable table) throws IOException { - this(table, new HBaseAdmin(table.getConfiguration())); - } - - /** Constructor for unit testing */ - HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws IOException { + public HBaseRegionSizeCalculator(String tableName, Connection hbaseConnection) throws IOException { + Table table = null; + Admin admin = null; try { + table = hbaseConnection.getTable(TableName.valueOf(tableName)); + admin = hbaseConnection.getAdmin(); + if (!enabled(table.getConfiguration())) { logger.info("Region size calculation disabled."); return; } - logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\"."); + logger.info("Calculating region sizes for table \"" + table.getName() + "\"."); // Get regions for table. - Set tableRegionInfos = table.getRegionLocations().keySet(); + RegionLocator regionLocator = hbaseConnection.getRegionLocator(table.getName()); + List regionLocationList = regionLocator.getAllRegionLocations(); Set tableRegions = new TreeSet(Bytes.BYTES_COMPARATOR); - for (HRegionInfo regionInfo : tableRegionInfos) { - tableRegions.add(regionInfo.getRegionName()); + for (HRegionLocation hRegionLocation : regionLocationList) { + tableRegions.add(hRegionLocation.getRegionInfo().getRegionName()); } - ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus(); + ClusterStatus clusterStatus = admin.getClusterStatus(); Collection servers = clusterStatus.getServers(); final long megaByte = 1024L * 1024L; @@ -104,7 +94,7 @@ public class HBaseRegionSizeCalculator { } } } finally { - hBaseAdmin.close(); + admin.close(); } } diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java index ffb1e25..f2578c0 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HBaseUsage.java @@ -26,11 +26,15 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.metadata.realization.IRealizationConstants; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.kylin.storage.hbase.HBaseConnection; public class HBaseUsage { @@ -42,8 +46,8 @@ public class HBaseUsage { Map> envs = Maps.newHashMap(); // get all kylin hbase tables - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + Admin hbaseAdmin = conn.getAdmin(); String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); for (HTableDescriptor desc : tableDescriptors) { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java index b5c9b1d..79f9fb3 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java @@ -32,16 +32,18 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,11 +59,11 @@ public class HbaseStreamingInput { private static final byte[] QN = "C".getBytes(); public static void createTable(String tableName) throws IOException { - HConnection conn = getConnection(); - HBaseAdmin hadmin = new HBaseAdmin(conn); + Connection conn = getConnection(); + Admin hadmin = conn.getAdmin(); try { - boolean tableExist = hadmin.tableExists(tableName); + boolean tableExist = hadmin.tableExists(TableName.valueOf(tableName)); if (tableExist) { logger.info("HTable '" + tableName + "' already exists"); return; @@ -118,8 +120,8 @@ public class HbaseStreamingInput { e.printStackTrace(); } - HConnection conn = getConnection(); - HTableInterface table = conn.getTable(tableName); + Connection conn = getConnection(); + Table table = conn.getTable(TableName.valueOf(tableName)); byte[] key = new byte[8 + 4];//time + id @@ -134,7 +136,7 @@ public class HbaseStreamingInput { Bytes.putInt(key, 8, i); Put put = new Put(key); byte[] cell = randomBytes(CELL_SIZE); - put.add(CF, QN, cell); + put.addColumn(CF, QN, cell); buffer.add(put); } table.put(buffer); @@ -169,8 +171,8 @@ public class HbaseStreamingInput { } Random r = new Random(); - HConnection conn = getConnection(); - HTableInterface table = conn.getTable(tableName); + Connection conn = getConnection(); + Table table = conn.getTable(TableName.valueOf(tableName)); long leftBound = getFirstKeyTime(table); long rightBound = System.currentTimeMillis(); @@ -205,7 +207,7 @@ public class HbaseStreamingInput { } } - private static long getFirstKeyTime(HTableInterface table) throws IOException { + private static long getFirstKeyTime(Table table) throws IOException { long startTime = 0; Scan scan = new Scan(); @@ -223,8 +225,8 @@ public class HbaseStreamingInput { } - private static HConnection getConnection() throws IOException { - return HConnectionManager.createConnection(HBaseConfiguration.create()); + private static Connection getConnection() throws IOException { + return HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); } private static String formatTime(long time) { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java index 239adcf..095743f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HtableAlterMetadataCLI.java @@ -27,9 +27,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,8 +74,8 @@ public class HtableAlterMetadataCLI extends AbstractHadoopJob { } private void alter() throws IOException { - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + Admin hbaseAdmin = conn.getAdmin(); HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); hbaseAdmin.disableTable(table.getTableName()); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java index f0618c9..b7303a5 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/OrphanHBaseCleanJob.java @@ -31,10 +31,15 @@ import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,7 +91,8 @@ public class OrphanHBaseCleanJob extends AbstractHadoopJob { private void cleanUnusedHBaseTables(Configuration conf) throws IOException { // get all kylin hbase tables - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + Admin hbaseAdmin = conn.getAdmin(); String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); List allTablesNeedToBeDropped = new ArrayList(); @@ -105,12 +111,12 @@ public class OrphanHBaseCleanJob extends AbstractHadoopJob { // drop tables for (String htableName : allTablesNeedToBeDropped) { logger.info("Deleting HBase table " + htableName); - if (hbaseAdmin.tableExists(htableName)) { - if (hbaseAdmin.isTableEnabled(htableName)) { - hbaseAdmin.disableTable(htableName); + if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) { + if (hbaseAdmin.isTableEnabled(TableName.valueOf(htableName))) { + hbaseAdmin.disableTable(TableName.valueOf(htableName)); } - hbaseAdmin.deleteTable(htableName); + hbaseAdmin.deleteTable(TableName.valueOf(htableName)); logger.info("Deleted HBase table " + htableName); } else { logger.info("HBase table" + htableName + " does not exist"); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java index 58ef7cb..b039a03 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/PingHBaseCLI.java @@ -21,9 +21,10 @@ package org.apache.kylin.storage.hbase.util; import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -57,12 +58,12 @@ public class PingHBaseCLI { Scan scan = new Scan(); int limit = 20; - HConnection conn = null; - HTableInterface table = null; + Connection conn = null; + Table table = null; ResultScanner scanner = null; try { - conn = HConnectionManager.createConnection(hconf); - table = conn.getTable(hbaseTable); + conn = ConnectionFactory.createConnection(hconf); + table = conn.getTable(TableName.valueOf(hbaseTable)); scanner = table.getScanner(scan); int count = 0; for (Result r : scanner) { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java index 0bf2fc9..dbeab73 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java @@ -23,9 +23,10 @@ import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.kylin.common.util.Bytes; @@ -70,8 +71,8 @@ public class RowCounterCLI { logger.info("My Scan " + scan.toString()); - HConnection conn = HConnectionManager.createConnection(conf); - HTableInterface tableInterface = conn.getTable(htableName); + Connection conn = ConnectionFactory.createConnection(conf); + Table tableInterface = conn.getTable(TableName.valueOf(htableName)); Iterator iterator = tableInterface.getScanner(scan).iterator(); int counter = 0; diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java index c010d51..fd39ee6 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java @@ -35,7 +35,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.CliCommandExecutor; @@ -52,6 +54,7 @@ import org.apache.kylin.job.engine.JobEngineConfig; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.manager.ExecutableManager; import org.apache.kylin.metadata.realization.IRealizationConstants; +import org.apache.kylin.storage.hbase.HBaseConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,7 +107,8 @@ public class StorageCleanupJob extends AbstractHadoopJob { IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); // get all kylin hbase tables - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + Admin hbaseAdmin = conn.getAdmin(); String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); List allTablesNeedToBeDropped = new ArrayList(); @@ -148,12 +152,12 @@ public class StorageCleanupJob extends AbstractHadoopJob { // drop tables for (String htableName : allTablesNeedToBeDropped) { logger.info("Deleting HBase table " + htableName); - if (hbaseAdmin.tableExists(htableName)) { - if (hbaseAdmin.isTableEnabled(htableName)) { - hbaseAdmin.disableTable(htableName); + if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) { + if (hbaseAdmin.isTableEnabled(TableName.valueOf(htableName))) { + hbaseAdmin.disableTable(TableName.valueOf(htableName)); } - hbaseAdmin.deleteTable(htableName); + hbaseAdmin.deleteTable(TableName.valueOf(htableName)); logger.info("Deleted HBase table " + htableName); } else { logger.info("HBase table" + htableName + " does not exist"); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java index 23357f5..3fec98c 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/UpdateHTableHostCLI.java @@ -26,9 +26,12 @@ import java.util.List; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -39,7 +42,6 @@ import org.apache.kylin.invertedindex.IISegment; import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.apache.kylin.metadata.realization.IRealizationConstants; import org.apache.kylin.metadata.realization.RealizationStatusEnum; -import org.apache.kylin.storage.hbase.HBaseConnection; import com.google.common.collect.Lists; @@ -52,14 +54,15 @@ public class UpdateHTableHostCLI { private List errorMsgs = Lists.newArrayList(); private List htables; - private HBaseAdmin hbaseAdmin; + private Admin hbaseAdmin; private KylinConfig kylinConfig; private String oldHostValue; public UpdateHTableHostCLI(List htables, String oldHostValue) throws IOException { this.htables = htables; this.oldHostValue = oldHostValue; - this.hbaseAdmin = new HBaseAdmin(HBaseConnection.getCurrentHBaseConfiguration()); + Connection conn = ConnectionFactory.createConnection(HBaseConfiguration.create()); + hbaseAdmin = conn.getAdmin(); this.kylinConfig = KylinConfig.getInstanceFromEnv(); } @@ -181,9 +184,9 @@ public class UpdateHTableHostCLI { HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); if (oldHostValue.equals(desc.getValue(IRealizationConstants.HTableTag))) { desc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix()); - hbaseAdmin.disableTable(tableName); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); updatedResources.add(tableName); } diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java index cd4e33d..9585575 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java @@ -29,12 +29,10 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.kv.RowConstants; import org.apache.kylin.metadata.datatype.LongMutable; @@ -233,15 +231,8 @@ public class AggregateRegionObserverTest { return nextRaw(results); } - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.hbase.regionserver.InternalScanner#next(java.util - * .List, int) - */ @Override - public boolean next(List result, int limit) throws IOException { + public boolean next(List result, ScannerContext scannerContext) throws IOException { return next(result); } @@ -310,6 +301,11 @@ public class AggregateRegionObserverTest { return 0; } + @Override + public int getBatch() { + return 0; + } + /* * (non-Javadoc) * @@ -326,16 +322,9 @@ public class AggregateRegionObserverTest { return i < input.size(); } - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw(java.util - * .List, int) - */ @Override - public boolean nextRaw(List result, int limit) throws IOException { - return nextRaw(result); + public boolean nextRaw(List list, ScannerContext scannerContext) throws IOException { + return false; } } diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java index 1d85922..04e2e8b 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -136,7 +137,7 @@ public class TestFuzzyRowFilterV2EndToEnd { Put p = new Put(rk); p.setDurability(Durability.SKIP_WAL); - p.add(cf.getBytes(), cq, Bytes.toBytes(c)); + p.addColumn(cf.getBytes(), cq, Bytes.toBytes(c)); ht.put(p); } } @@ -224,7 +225,7 @@ public class TestFuzzyRowFilterV2EndToEnd { scan.addFamily(cf.getBytes()); scan.setFilter(filter); List regions = TEST_UTIL.getHBaseCluster().getRegions(table.getBytes()); - HRegion first = regions.get(0); + Region first = regions.get(0); first.getScanner(scan); RegionScanner scanner = first.getScanner(scan); List results = new ArrayList(); -- 2.7.2