From ee4a09418fd7d78dec6ab1863394d3ffff232b27 Mon Sep 17 00:00:00 2001
From: zhangduo
Date: Thu, 14 Feb 2019 21:12:17 +0800
Subject: [PATCH] HBASE-21717 Implement Connection based on AsyncConnection
---
.../hadoop/hbase/backup/TestBackupBase.java | 5 +-
.../hadoop/hbase/backup/TestBackupMerge.java | 11 +-
.../backup/TestBackupMultipleDeletes.java | 10 +-
.../hbase/backup/TestIncrementalBackup.java | 16 +-
.../TestIncrementalBackupDeleteTable.java | 10 +-
...estIncrementalBackupMergeWithFailures.java | 7 +-
.../TestIncrementalBackupWithBulkLoad.java | 6 +-
.../TestIncrementalBackupWithFailures.java | 6 +-
.../hadoop/hbase/backup/TestRemoteBackup.java | 8 +-
.../backup/master/TestBackupLogCleaner.java | 6 +-
.../hadoop/hbase/client/AsyncConnection.java | 8 +
.../hbase/client/AsyncConnectionImpl.java | 49 +-
.../hbase/client/AsyncMetaRegionLocator.java | 2 +-
.../hadoop/hbase/client/Connection.java | 17 +-
.../hbase/client/ConnectionFactory.java | 44 +-
.../client/ConnectionImplementation.java | 54 +-
.../client/ConnectionOverAsyncConnection.java | 180 ++++++
.../hadoop/hbase/client/ConnectionUtils.java | 104 ++--
.../apache/hadoop/hbase/client/HTable.java | 213 +------
.../RegionCoprocessorRpcChannelImpl.java | 17 +-
.../org/apache/hadoop/hbase/client/Table.java | 540 ++++--------------
.../hbase/client/TableOverAsyncTable.java | 489 ++++++++++++++++
.../hbase/ipc/CoprocessorRpcChannel.java | 12 +-
.../hadoop/hbase/client/SimpleRegistry.java | 66 +++
.../hadoop/hbase/client/TestAsyncProcess.java | 2 +-
.../hbase/client/TestBufferedMutator.java | 14 +-
.../hbase/client/TestClientNoCluster.java | 33 +-
.../mapreduce/TestHFileOutputFormat2.java | 27 +-
.../TestMultiTableInputFormatBase.java | 6 +
.../mapreduce/TestTableInputFormatBase.java | 6 +
.../hadoop/hbase/rest/SchemaResource.java | 18 +-
.../hbase/rest/client/RemoteHTable.java | 268 +++------
.../hbase/rest/client/TestRemoteTable.java | 7 +-
.../hbase/client/SharedAsyncConnection.java | 112 ++++
.../hbase/{ => client}/SharedConnection.java | 12 +-
.../apache/hadoop/hbase/master/HMaster.java | 5 +-
.../hbase/master/MasterCoprocessorHost.java | 2 +-
.../hbase/regionserver/HRegionServer.java | 68 +--
.../regionserver/RegionCoprocessorHost.java | 2 +-
.../RegionServerCoprocessorHost.java | 2 +-
.../hadoop/hbase/util/MultiHConnection.java | 141 -----
.../resources/hbase-webapps/master/table.jsp | 5 +-
.../hadoop/hbase/HBaseTestingUtility.java | 95 +--
.../AbstractTestCIOperationTimeout.java | 4 +-
.../client/AbstractTestCIRpcTimeout.java | 2 +-
.../hbase/client/AbstractTestCITimeout.java | 2 +-
.../client/DummyAsyncClusterConnection.java | 5 +
.../hadoop/hbase/client/TestAdmin1.java | 285 ++++-----
.../hadoop/hbase/client/TestAdmin2.java | 132 ++---
.../hbase/client/TestAlwaysSetScannerId.java | 34 +-
.../hbase/client/TestAsyncTableAdminApi.java | 2 +-
.../hbase/client/TestCIBadHostname.java | 28 +-
.../hadoop/hbase/client/TestCISleep.java | 53 +-
.../hbase/client/TestClientPushback.java | 214 +++----
.../client/TestConnectionImplementation.java | 3 +
.../hbase/client/TestFromClientSide.java | 46 +-
.../hbase/client/TestFromClientSide3.java | 37 +-
.../hbase/client/TestGetProcedureResult.java | 7 +-
.../client/TestIncrementsFromClientSide.java | 41 +-
.../hadoop/hbase/client/TestMetaCache.java | 5 +
.../hbase/client/TestMetaWithReplicas.java | 9 +-
.../TestMultiActionMetricsFromClient.java | 13 +-
.../hbase/client/TestMultiParallel.java | 61 +-
.../client/TestRegionLocationCaching.java | 5 +
.../hbase/client/TestReplicaWithCluster.java | 5 +
.../hbase/client/TestReplicasClient.java | 36 +-
.../client/TestScanWithoutFetchingData.java | 32 +-
.../client/TestShortCircuitConnection.java | 95 ---
.../client/TestSnapshotCloneIndependence.java | 2 +-
.../hbase/client/TestSnapshotMetadata.java | 2 +-
.../TestCoprocessorShortCircuitRPC.java | 12 +-
.../TestPassCustomCellViaRegionObserver.java | 5 +-
.../hbase/filter/TestMultiRowRangeFilter.java | 45 +-
.../hadoop/hbase/master/TestWarmupRegion.java | 4 +-
.../hbase/regionserver/RegionAsTable.java | 120 +---
.../TestEndToEndSplitTransaction.java | 2 +-
.../regionserver/TestHRegionFileSystem.java | 17 +-
.../TestNewVersionBehaviorFromClientSide.java | 7 +-
.../TestPerColumnFamilyFlush.java | 7 -
.../TestSettingTimeoutOnBlockingPoint.java | 14 +-
.../replication/TestReplicationBase.java | 2 +-
...estCoprocessorWhitelistMasterObserver.java | 10 +-
.../snapshot/TestRegionSnapshotTask.java | 2 +-
.../hbase/util/MultiThreadedAction.java | 5 +-
.../util/hbck/OfflineMetaRebuildTestCore.java | 9 +-
.../thrift/ThriftHBaseServiceHandler.java | 12 +-
.../thrift2/ThriftHBaseServiceHandler.java | 2 +-
.../thrift2/client/ThriftConnection.java | 6 +
.../hbase/thrift2/client/ThriftTable.java | 4 +-
.../hbase/thrift2/TestThriftConnection.java | 2 +-
90 files changed, 2030 insertions(+), 2122 deletions(-)
create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/SimpleRegistry.java
create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/client/SharedAsyncConnection.java
rename hbase-server/src/main/java/org/apache/hadoop/hbase/{ => client}/SharedConnection.java (89%)
delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/util/MultiHConnection.java
delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 2afdb4f4c1..e0fca20b54 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
@@ -354,9 +353,9 @@ public class TestBackupBase {
TEST_UTIL.shutdownMiniMapReduceCluster();
}
- HTable insertIntoTable(Connection conn, TableName table, byte[] family, int id, int numRows)
+ Table insertIntoTable(Connection conn, TableName table, byte[] family, int id, int numRows)
throws IOException {
- HTable t = (HTable) conn.getTable(table);
+ Table t = conn.getTable(table);
Put p1;
for (int i = 0; i < numRows; i++) {
p1 = new Put(Bytes.toBytes("row-" + table + "-" + id + "-" + i));
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
index 8ead548110..beacef3c14 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.backup;
import static org.junit.Assert.assertTrue;
import java.util.List;
-
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
@@ -28,10 +27,8 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
@@ -39,6 +36,8 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
@Category(LargeTests.class)
public class TestBackupMerge extends TestBackupBase {
@@ -72,14 +71,14 @@ public class TestBackupMerge extends TestBackupBase {
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table1
- HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+ Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
t1.close();
LOG.debug("written " + ADD_ROWS + " rows to " + table1);
- HTable t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
+ Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
t2.close();
@@ -115,7 +114,7 @@ public class TestBackupMerge extends TestBackupBase {
tablesRestoreIncMultiple, tablesMapIncMultiple, true));
Table hTable = conn.getTable(table1_restore);
- LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
+ LOG.debug("After incremental restore: " + hTable.getDescriptor());
int countRows = TEST_UTIL.countRows(hTable, famName);
LOG.debug("f1 has " + countRows + " rows");
Assert.assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, countRows);
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMultipleDeletes.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMultipleDeletes.java
index db1a4e2e31..bffa480817 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMultipleDeletes.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMultipleDeletes.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
@@ -67,7 +67,7 @@ public class TestBackupMultipleDeletes extends TestBackupBase {
String backupIdFull = client.backupTables(request);
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table table1
- HTable t1 = (HTable) conn.getTable(table1);
+ Table t1 = conn.getTable(table1);
Put p1;
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p1 = new Put(Bytes.toBytes("row-t1" + i));
@@ -82,7 +82,7 @@ public class TestBackupMultipleDeletes extends TestBackupBase {
String backupIdInc1 = client.backupTables(request);
assertTrue(checkSucceeded(backupIdInc1));
// #4 - insert some data to table table2
- HTable t2 = (HTable) conn.getTable(table2);
+ Table t2 = conn.getTable(table2);
Put p2 = null;
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p2 = new Put(Bytes.toBytes("row-t2" + i));
@@ -95,7 +95,7 @@ public class TestBackupMultipleDeletes extends TestBackupBase {
String backupIdInc2 = client.backupTables(request);
assertTrue(checkSucceeded(backupIdInc2));
// #6 - insert some data to table table1
- t1 = (HTable) conn.getTable(table1);
+ t1 = conn.getTable(table1);
for (int i = NB_ROWS_IN_BATCH; i < 2 * NB_ROWS_IN_BATCH; i++) {
p1 = new Put(Bytes.toBytes("row-t1" + i));
p1.addColumn(famName, qualName, Bytes.toBytes("val" + i));
@@ -107,7 +107,7 @@ public class TestBackupMultipleDeletes extends TestBackupBase {
String backupIdInc3 = client.backupTables(request);
assertTrue(checkSucceeded(backupIdInc3));
// #8 - insert some data to table table2
- t2 = (HTable) conn.getTable(table2);
+ t2 = conn.getTable(table2);
for (int i = NB_ROWS_IN_BATCH; i < 2 * NB_ROWS_IN_BATCH; i++) {
p2 = new Put(Bytes.toBytes("row-t1" + i));
p2.addColumn(famName, qualName, Bytes.toBytes("val" + i));
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
index 6e1523812a..064c45e80c 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java
@@ -33,8 +33,8 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -102,7 +102,7 @@ public class TestIncrementalBackup extends TestBackupBase {
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table
- HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+ Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
Assert.assertEquals(HBaseTestingUtility.countRows(t1),
NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_FAM3);
@@ -116,7 +116,7 @@ public class TestIncrementalBackup extends TestBackupBase {
Assert.assertEquals(HBaseTestingUtility.countRows(t1),
NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_MOB);
- HTable t2 = (HTable) conn.getTable(table2);
+ Table t2 = conn.getTable(table2);
Put p2;
for (int i = 0; i < 5; i++) {
p2 = new Put(Bytes.toBytes("row-t2" + i));
@@ -163,7 +163,7 @@ public class TestIncrementalBackup extends TestBackupBase {
HBaseTestingUtility.modifyTableSync(TEST_UTIL.getAdmin(), table1Desc);
int NB_ROWS_FAM2 = 7;
- HTable t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
+ Table t3 = insertIntoTable(conn, table1, fam2Name, 2, NB_ROWS_FAM2);
t3.close();
// Wait for 5 sec to make sure that old WALs were deleted
@@ -189,11 +189,11 @@ public class TestIncrementalBackup extends TestBackupBase {
hAdmin.close();
// #6.2 - checking row count of tables for full restore
- HTable hTable = (HTable) conn.getTable(table1_restore);
+ Table hTable = conn.getTable(table1_restore);
Assert.assertEquals(HBaseTestingUtility.countRows(hTable), NB_ROWS_IN_BATCH + NB_ROWS_FAM3);
hTable.close();
- hTable = (HTable) conn.getTable(table2_restore);
+ hTable = conn.getTable(table2_restore);
Assert.assertEquals(NB_ROWS_IN_BATCH, HBaseTestingUtility.countRows(hTable));
hTable.close();
@@ -202,7 +202,7 @@ public class TestIncrementalBackup extends TestBackupBase {
TableName[] tablesMapIncMultiple = new TableName[] { table1_restore, table2_restore };
client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple2,
false, tablesRestoreIncMultiple, tablesMapIncMultiple, true));
- hTable = (HTable) conn.getTable(table1_restore);
+ hTable = conn.getTable(table1_restore);
LOG.debug("After incremental restore: " + hTable.getDescriptor());
int countFamName = TEST_UTIL.countRows(hTable, famName);
@@ -218,7 +218,7 @@ public class TestIncrementalBackup extends TestBackupBase {
Assert.assertEquals(countMobName, NB_ROWS_MOB);
hTable.close();
- hTable = (HTable) conn.getTable(table2_restore);
+ hTable = conn.getTable(table2_restore);
Assert.assertEquals(NB_ROWS_IN_BATCH + 5, HBaseTestingUtility.countRows(hTable));
hTable.close();
admin.close();
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java
index f8129d9223..08834f2fae 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupDeleteTable.java
@@ -27,8 +27,8 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assert;
@@ -75,7 +75,7 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase {
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table table1
- HTable t1 = (HTable) conn.getTable(table1);
+ Table t1 = conn.getTable(table1);
Put p1;
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p1 = new Put(Bytes.toBytes("row-t1" + i));
@@ -110,11 +110,11 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase {
assertTrue(hAdmin.tableExists(table2_restore));
// #5.2 - checking row count of tables for full restore
- HTable hTable = (HTable) conn.getTable(table1_restore);
+ Table hTable = conn.getTable(table1_restore);
Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH);
hTable.close();
- hTable = (HTable) conn.getTable(table2_restore);
+ hTable = conn.getTable(table2_restore);
Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH);
hTable.close();
@@ -124,7 +124,7 @@ public class TestIncrementalBackupDeleteTable extends TestBackupBase {
client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple,
false, tablesRestoreIncMultiple, tablesMapIncMultiple, true));
- hTable = (HTable) conn.getTable(table1_restore);
+ hTable = conn.getTable(table1_restore);
Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2);
hTable.close();
admin.close();
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
index 57bdc46409..73512587c4 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithFailures.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Pair;
@@ -245,14 +244,14 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table1
- HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+ Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS);
t1.close();
LOG.debug("written " + ADD_ROWS + " rows to " + table1);
- HTable t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
+ Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS);
Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS);
t2.close();
@@ -334,7 +333,7 @@ public class TestIncrementalBackupMergeWithFailures extends TestBackupBase {
tablesRestoreIncMultiple, tablesMapIncMultiple, true));
Table hTable = conn.getTable(table1_restore);
- LOG.debug("After incremental restore: " + hTable.getTableDescriptor());
+ LOG.debug("After incremental restore: " + hTable.getDescriptor());
LOG.debug("f1 has " + TEST_UTIL.countRows(hTable, famName) + " rows");
Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH + 2 * ADD_ROWS);
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
index 82f0fb7b05..4b02077934 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.tool.TestBulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
@@ -79,7 +79,7 @@ public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table table1
- HTable t1 = (HTable) conn.getTable(table1);
+ Table t1 = conn.getTable(table1);
Put p1;
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p1 = new Put(Bytes.toBytes("row-t1" + i));
@@ -127,7 +127,7 @@ public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
client.restore(BackupUtils.createRestoreRequest(BACKUP_ROOT_DIR, backupIdIncMultiple1,
false, tablesRestoreIncMultiple, tablesRestoreIncMultiple, true));
- HTable hTable = (HTable) conn.getTable(table1);
+ Table hTable = conn.getTable(table1);
Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH * 2 + actual + actual1);
request = createBackupRequest(BackupType.FULL, tables, BACKUP_ROOT_DIR);
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithFailures.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithFailures.java
index d5829b2cbe..f6725d9e25 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithFailures.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithFailures.java
@@ -35,8 +35,8 @@ import org.apache.hadoop.hbase.backup.impl.TableBackupClient.Stage;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ToolRunner;
@@ -100,14 +100,14 @@ public class TestIncrementalBackupWithFailures extends TestBackupBase {
assertTrue(checkSucceeded(backupIdFull));
// #2 - insert some data to table
- HTable t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
+ Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS);
LOG.debug("writing " + ADD_ROWS + " rows to " + table1);
Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS + NB_ROWS_FAM3);
t1.close();
LOG.debug("written " + ADD_ROWS + " rows to " + table1);
- HTable t2 = (HTable) conn.getTable(table2);
+ Table t2 = conn.getTable(table2);
Put p2;
for (int i = 0; i < 5; i++) {
p2 = new Put(Bytes.toBytes("row-t2" + i));
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestRemoteBackup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestRemoteBackup.java
index a0226e6f2b..05826e204b 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestRemoteBackup.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestRemoteBackup.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -78,7 +78,7 @@ public class TestRemoteBackup extends TestBackupBase {
} catch (InterruptedException ie) {
}
try {
- HTable t1 = (HTable) conn.getTable(table1);
+ Table t1 = conn.getTable(table1);
Put p1;
for (int i = 0; i < NB_ROWS_IN_FAM3; i++) {
p1 = new Put(Bytes.toBytes("row-t1" + i));
@@ -102,7 +102,7 @@ public class TestRemoteBackup extends TestBackupBase {
HBaseTestingUtility.modifyTableSync(TEST_UTIL.getAdmin(), table1Desc);
SnapshotTestingUtils.loadData(TEST_UTIL, table1, 50, fam2Name);
- HTable t1 = (HTable) conn.getTable(table1);
+ Table t1 = conn.getTable(table1);
int rows0 = MobSnapshotTestingUtils.countMobRows(t1, fam2Name);
latch.countDown();
@@ -130,7 +130,7 @@ public class TestRemoteBackup extends TestBackupBase {
assertTrue(hAdmin.tableExists(table1_restore));
// #5.2 - checking row count of tables for full restore
- HTable hTable = (HTable) conn.getTable(table1_restore);
+ Table hTable = conn.getTable(table1_restore);
Assert.assertEquals(TEST_UTIL.countRows(hTable, famName), NB_ROWS_IN_BATCH);
int cnt3 = TEST_UTIL.countRows(hTable, fam3Name);
Assert.assertTrue(cnt3 >= 0 && cnt3 <= NB_ROWS_IN_FAM3);
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java
index 9273487f92..6b8011eca5 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/master/TestBackupLogCleaner.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.hbase.backup.TestBackupBase;
import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -107,7 +107,7 @@ public class TestBackupLogCleaner extends TestBackupBase {
assertTrue(walFiles.size() < newWalFiles.size());
Connection conn = ConnectionFactory.createConnection(conf1);
// #2 - insert some data to table
- HTable t1 = (HTable) conn.getTable(table1);
+ Table t1 = conn.getTable(table1);
Put p1;
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p1 = new Put(Bytes.toBytes("row-t1" + i));
@@ -117,7 +117,7 @@ public class TestBackupLogCleaner extends TestBackupBase {
t1.close();
- HTable t2 = (HTable) conn.getTable(table2);
+ Table t2 = conn.getTable(table2);
Put p2;
for (int i = 0; i < 5; i++) {
p2 = new Put(Bytes.toBytes("row-t2" + i));
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
index 75971ad610..0546520bbf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -201,6 +201,14 @@ public interface AsyncConnection extends Closeable {
*/
boolean isClosed();
+ /**
+ * Convert this connection to a {@link Connection}.
+ *
+ * Usually we will return the same instance if you call this method multiple times so you can
+ * consider this as a light-weighted operation.
+ */
+ Connection toConnection();
+
/**
* Retrieve an Hbck implementation to fix an HBase cluster. The returned Hbck is not guaranteed to
* be thread-safe. A new instance should be created by each thread. This is a lightweight
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 7c91e49935..d05062ce54 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLE
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -108,6 +109,8 @@ class AsyncConnectionImpl implements AsyncConnection {
private volatile boolean closed = false;
+ private volatile ConnectionOverAsyncConnection conn;
+
public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
SocketAddress localAddress, User user) {
this.conf = conf;
@@ -141,6 +144,11 @@ class AsyncConnectionImpl implements AsyncConnection {
return conf;
}
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+
@Override
public void close() {
// As the code below is safe to be executed in parallel, here we do not use CAS or lock, just a
@@ -153,17 +161,21 @@ class AsyncConnectionImpl implements AsyncConnection {
if (authService != null) {
authService.shutdown();
}
+ ConnectionOverAsyncConnection c = this.conn;
+ if (c != null) {
+ c.closeConnImpl();
+ }
closed = true;
}
@Override
- public boolean isClosed() {
- return closed;
+ public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
+ return new AsyncTableRegionLocatorImpl(tableName, this);
}
@Override
- public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
- return new AsyncTableRegionLocatorImpl(tableName, this);
+ public void clearRegionLocationCache() {
+ locator.clearCache();
}
// we will override this method for testing retry caller, so do not remove this method.
@@ -341,6 +353,30 @@ class AsyncConnectionImpl implements AsyncConnection {
RETRY_TIMER);
}
+ @Override
+ public Connection toConnection() {
+ ConnectionOverAsyncConnection c = this.conn;
+ if (c != null) {
+ return c;
+ }
+ synchronized (this) {
+ c = this.conn;
+ if (c != null) {
+ return c;
+ }
+ try {
+ c = new ConnectionOverAsyncConnection(this,
+ ConnectionFactory.createConnectionImpl(conf, null, user));
+ } catch (IOException e) {
+ // TODO: finally we will not rely on ConnectionImplementation anymore and there will no
+ // IOException here.
+ throw new UncheckedIOException(e);
+ }
+ this.conn = c;
+ }
+ return c;
+ }
+
@Override
public CompletableFuture getHbck() {
CompletableFuture future = new CompletableFuture<>();
@@ -366,9 +402,4 @@ class AsyncConnectionImpl implements AsyncConnection {
return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
}
-
- @Override
- public void clearRegionLocationCache() {
- locator.clearCache();
- }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
index f5b3f92f65..2dfaf02e45 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
@@ -70,7 +70,7 @@ class AsyncMetaRegionLocator {
}
LOG.trace("Meta region location cache is null, try fetching from registry.");
if (metaRelocateFuture.compareAndSet(null, new CompletableFuture<>())) {
- LOG.debug("Start fetching meta region location from registry.");
+ LOG.debug("Start fetching meta region location from registry.", new Exception());
CompletableFuture future = metaRelocateFuture.get();
addListener(registry.getMetaRegionLocation(), (locs, error) -> {
if (error != null) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
index 90891f411b..b88c40c6eb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -124,7 +125,9 @@ public interface Connection extends Abortable, Closeable {
*
* @return a {@link BufferedMutator} for the supplied tableName.
*/
- BufferedMutator getBufferedMutator(TableName tableName) throws IOException;
+ default BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
+ return getBufferedMutator(new BufferedMutatorParams(tableName));
+ }
/**
* Retrieve a {@link BufferedMutator} for performing client-side buffering of writes. The
@@ -193,6 +196,14 @@ public interface Connection extends Abortable, Closeable {
*/
TableBuilder getTableBuilder(TableName tableName, ExecutorService pool);
+ /**
+ * Convert this connection to an {@link AsyncConnection}.
+ *
+ * Usually we will return the same instance if you call this method multiple times so you can
+ * consider this as a light-weighted operation.
+ */
+ AsyncConnection toAsyncConnection();
+
/**
* Retrieve an Hbck implementation to fix an HBase cluster.
* The returned Hbck is not guaranteed to be thread-safe. A new instance should be created by
@@ -207,7 +218,7 @@ public interface Connection extends Abortable, Closeable {
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
default Hbck getHbck() throws IOException {
- throw new UnsupportedOperationException("Not implemented");
+ return FutureUtils.get(toAsyncConnection().getHbck());
}
/**
@@ -228,6 +239,6 @@ public interface Connection extends Abortable, Closeable {
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.HBCK)
default Hbck getHbck(ServerName masterServer) throws IOException {
- throw new UnsupportedOperationException("Not implemented");
+ return toAsyncConnection().getHbck(masterServer);
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
index bcb5d1ab50..1945bd0db3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -211,28 +212,33 @@ public class ConnectionFactory {
* @return Connection object for conf
*/
public static Connection createConnection(Configuration conf, ExecutorService pool,
- final User user) throws IOException {
- String className = conf.get(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
- ConnectionImplementation.class.getName());
- Class> clazz;
- try {
- clazz = Class.forName(className);
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- }
- try {
- // Default HCM#HCI is not accessible; make it so before invoking.
- Constructor> constructor = clazz.getDeclaredConstructor(Configuration.class,
- ExecutorService.class, User.class);
- constructor.setAccessible(true);
- return user.runAs(
- (PrivilegedExceptionAction)() ->
- (Connection) constructor.newInstance(conf, pool, user));
- } catch (Exception e) {
- throw new IOException(e);
+ final User user) throws IOException {
+ Class> clazz = conf.getClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
+ ConnectionOverAsyncConnection.class, Connection.class);
+ if (clazz != ConnectionOverAsyncConnection.class) {
+ try {
+ // Default HCM#HCI is not accessible; make it so before invoking.
+ Constructor> constructor =
+ clazz.getDeclaredConstructor(Configuration.class, ExecutorService.class, User.class);
+ constructor.setAccessible(true);
+ return user.runAs((PrivilegedExceptionAction) () -> (Connection) constructor
+ .newInstance(conf, pool, user));
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ } else {
+ return FutureUtils.get(createAsyncConnection(conf, user)).toConnection();
}
}
+ /**
+ * Create a {@link ConnectionImplementation}, internal use only.
+ */
+ static ConnectionImplementation createConnectionImpl(Configuration conf, ExecutorService pool,
+ User user) throws IOException {
+ return new ConnectionImplementation(conf, pool, user);
+ }
+
/**
* Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
* @see #createAsyncConnection(Configuration)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 307e8e0bee..ede00689c8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -41,8 +41,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
@@ -76,7 +74,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.yetus.audience.InterfaceAudience;
@@ -412,11 +409,6 @@ class ConnectionImplementation implements Connection, Closeable {
}
}
- @Override
- public BufferedMutator getBufferedMutator(TableName tableName) {
- return getBufferedMutator(new BufferedMutatorParams(tableName));
- }
-
@Override
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
return new HRegionLocator(tableName, this);
@@ -472,30 +464,8 @@ class ConnectionImplementation implements Connection, Closeable {
private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
BlockingQueue passedWorkQueue) {
// shared HTable thread executor not yet initialized
- if (maxThreads == 0) {
- maxThreads = Runtime.getRuntime().availableProcessors() * 8;
- }
- if (coreThreads == 0) {
- coreThreads = Runtime.getRuntime().availableProcessors() * 8;
- }
- long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
- BlockingQueue workQueue = passedWorkQueue;
- if (workQueue == null) {
- workQueue =
- new LinkedBlockingQueue<>(maxThreads *
- conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
- HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
- coreThreads = maxThreads;
- }
- ThreadPoolExecutor tpe = new ThreadPoolExecutor(
- coreThreads,
- maxThreads,
- keepAliveTime,
- TimeUnit.SECONDS,
- workQueue,
- Threads.newDaemonThreadFactory(toString() + nameHint));
- tpe.allowCoreThreadTimeOut(true);
- return tpe;
+ return ConnectionUtils.getThreadPool(conf, maxThreads, coreThreads, () -> toString() + nameHint,
+ passedWorkQueue);
}
private ExecutorService getMetaLookupPool() {
@@ -527,21 +497,10 @@ class ConnectionImplementation implements Connection, Closeable {
private void shutdownPools() {
if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown()) {
- shutdownBatchPool(this.batchPool);
+ ConnectionUtils.shutdownPool(this.batchPool);
}
if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
- shutdownBatchPool(this.metaLookupPool);
- }
- }
-
- private void shutdownBatchPool(ExecutorService pool) {
- pool.shutdown();
- try {
- if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
- pool.shutdownNow();
- }
- } catch (InterruptedException e) {
- pool.shutdownNow();
+ ConnectionUtils.shutdownPool(this.metaLookupPool);
}
}
@@ -2186,4 +2145,9 @@ class ConnectionImplementation implements Connection, Closeable {
throw new IOException(cause);
}
}
+
+ @Override
+ public AsyncConnection toAsyncConnection() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
new file mode 100644
index 0000000000..b44c3b7285
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+/**
+ * The connection implementation based on {@link AsyncConnection}.
+ */
+@InterfaceAudience.Private
+class ConnectionOverAsyncConnection implements Connection {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConnectionOverAsyncConnection.class);
+
+ private volatile boolean aborted = false;
+
+ private volatile ExecutorService batchPool = null;
+
+ protected final AsyncConnectionImpl conn;
+
+ /**
+ * @deprecated we can not implement all the related stuffs at once so keep it here for now, will
+ * remove it after we implement all the stuffs, like Admin, RegionLocator, etc.
+ */
+ @Deprecated
+ private final ConnectionImplementation oldConn;
+
+ private final ConnectionConfiguration connConf;
+
+ ConnectionOverAsyncConnection(AsyncConnectionImpl conn, ConnectionImplementation oldConn) {
+ this.conn = conn;
+ this.oldConn = oldConn;
+ this.connConf = new ConnectionConfiguration(conn.getConfiguration());
+ }
+
+ @Override
+ public void abort(String why, Throwable error) {
+ if (error != null) {
+ LOG.error(HBaseMarkers.FATAL, why, error);
+ } else {
+ LOG.error(HBaseMarkers.FATAL, why);
+ }
+ aborted = true;
+ try {
+ Closeables.close(this, true);
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ public boolean isAborted() {
+ return aborted;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conn.getConfiguration();
+ }
+
+ @Override
+ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
+ return oldConn.getBufferedMutator(params);
+ }
+
+ @Override
+ public RegionLocator getRegionLocator(TableName tableName) throws IOException {
+ return oldConn.getRegionLocator(tableName);
+ }
+
+ @Override
+ public void clearRegionLocationCache() {
+ conn.clearRegionLocationCache();
+ }
+
+ @Override
+ public Admin getAdmin() throws IOException {
+ return oldConn.getAdmin();
+ }
+
+ @Override
+ public void close() throws IOException {
+ conn.close();
+ }
+
+ // will be called from AsyncConnection, to avoid infinite loop as in the above method we will call
+ // AsyncConnection.close.
+ void closeConnImpl() {
+ ExecutorService batchPool = this.batchPool;
+ if (batchPool != null) {
+ ConnectionUtils.shutdownPool(batchPool);
+ this.batchPool = null;
+ }
+ }
+
+ @Override
+ public boolean isClosed() {
+ return conn.isClosed();
+ }
+
+ private ExecutorService getBatchPool() {
+ if (batchPool == null) {
+ synchronized (this) {
+ if (batchPool == null) {
+ int threads = conn.getConfiguration().getInt("hbase.hconnection.threads.max", 256);
+ this.batchPool = ConnectionUtils.getThreadPool(conn.getConfiguration(), threads, threads,
+ () -> toString() + "-shared", null);
+ }
+ }
+ }
+ return this.batchPool;
+ }
+
+ @Override
+ public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
+ return new TableBuilderBase(tableName, connConf) {
+
+ @Override
+ public Table build() {
+ ExecutorService p = pool != null ? pool : getBatchPool();
+ return new TableOverAsyncTable(conn,
+ conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS)
+ .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS)
+ .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.SECONDS)
+ .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS).build(),
+ p);
+ }
+ };
+ }
+
+ @Override
+ public AsyncConnection toAsyncConnection() {
+ return conn;
+ }
+
+ @Override
+ public Hbck getHbck() throws IOException {
+ return FutureUtils.get(conn.getHbck());
+ }
+
+ @Override
+ public Hbck getHbck(ServerName masterServer) throws IOException {
+ return conn.getHbck(masterServer);
+ }
+
+ /**
+ * An identifier that will remain the same for a given connection.
+ */
+ @Override
+ public String toString() {
+ return "connection-over-async-connection-0x" + Integer.toHexString(hashCode());
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 34c2ec077f..c7daa40749 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -28,11 +28,15 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
@@ -45,9 +49,9 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.DNS;
import org.apache.yetus.audience.InterfaceAudience;
@@ -61,10 +65,8 @@ import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
/**
* Utility used by client connections.
@@ -134,68 +136,6 @@ public final class ConnectionUtils {
log.info(sn + " server-side Connection retries=" + retries);
}
- /**
- * A ClusterConnection that will short-circuit RPC making direct invocations against the localhost
- * if the invocation target is 'this' server; save on network and protobuf invocations.
- */
- // TODO This has to still do PB marshalling/unmarshalling stuff. Check how/whether we can avoid.
- @VisibleForTesting // Class is visible so can assert we are short-circuiting when expected.
- public static class ShortCircuitingClusterConnection extends ConnectionImplementation {
- private final ServerName serverName;
- private final AdminService.BlockingInterface localHostAdmin;
- private final ClientService.BlockingInterface localHostClient;
-
- private ShortCircuitingClusterConnection(Configuration conf, ExecutorService pool, User user,
- ServerName serverName, AdminService.BlockingInterface admin,
- ClientService.BlockingInterface client) throws IOException {
- super(conf, pool, user);
- this.serverName = serverName;
- this.localHostAdmin = admin;
- this.localHostClient = client;
- }
-
- @Override
- public AdminService.BlockingInterface getAdmin(ServerName sn) throws IOException {
- return serverName.equals(sn) ? this.localHostAdmin : super.getAdmin(sn);
- }
-
- @Override
- public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
- return serverName.equals(sn) ? this.localHostClient : super.getClient(sn);
- }
-
- @Override
- public MasterKeepAliveConnection getMaster() throws IOException {
- if (this.localHostClient instanceof MasterService.BlockingInterface) {
- return new ShortCircuitMasterConnection(
- (MasterService.BlockingInterface) this.localHostClient);
- }
- return super.getMaster();
- }
- }
-
- /**
- * Creates a short-circuit connection that can bypass the RPC layer (serialization,
- * deserialization, networking, etc..) when talking to a local server.
- * @param conf the current configuration
- * @param pool the thread pool to use for batch operations
- * @param user the user the connection is for
- * @param serverName the local server name
- * @param admin the admin interface of the local server
- * @param client the client interface of the local server
- * @return an short-circuit connection.
- * @throws IOException if IO failure occurred
- */
- public static ConnectionImplementation createShortCircuitConnection(final Configuration conf,
- ExecutorService pool, User user, final ServerName serverName,
- final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client)
- throws IOException {
- if (user == null) {
- user = UserProvider.instantiate(conf).getCurrent();
- }
- return new ShortCircuitingClusterConnection(conf, pool, user, serverName, admin, client);
- }
-
/**
* Setup the connection class, so that it will not depend on master being online. Used for testing
* @param conf configuration to set
@@ -619,4 +559,38 @@ public final class ConnectionUtils {
}
return future;
}
+
+ static ExecutorService getThreadPool(Configuration conf, int maxThreads, int coreThreads,
+ Supplier threadName, BlockingQueue passedWorkQueue) {
+ // shared HTable thread executor not yet initialized
+ if (maxThreads == 0) {
+ maxThreads = Runtime.getRuntime().availableProcessors() * 8;
+ }
+ if (coreThreads == 0) {
+ coreThreads = Runtime.getRuntime().availableProcessors() * 8;
+ }
+ long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
+ BlockingQueue workQueue = passedWorkQueue;
+ if (workQueue == null) {
+ workQueue =
+ new LinkedBlockingQueue<>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
+ HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
+ coreThreads = maxThreads;
+ }
+ ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime,
+ TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(threadName.get()));
+ tpe.allowCoreThreadTimeOut(true);
+ return tpe;
+ }
+
+ static void shutdownPool(ExecutorService pool) {
+ pool.shutdown();
+ try {
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ pool.shutdownNow();
+ }
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index b17a670363..8097de237c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
+
// DO NOT MAKE USE OF THESE IMPORTS! THEY ARE HERE FOR COPROCESSOR ENDPOINTS ONLY.
// Internally, we use shaded protobuf. This below are part of our public API.
//SEE ABOVE NOTE!
@@ -25,27 +27,44 @@ import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
-
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.filter.BinaryComparator;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@@ -55,27 +74,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequ
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.hbase.util.Threads;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
/**
* An implementation of {@link Table}. Used to communicate with a single HBase table.
@@ -102,7 +100,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
*/
@InterfaceAudience.Private
@InterfaceStability.Stable
-public class HTable implements Table {
+class HTable implements Table {
private static final Logger LOG = LoggerFactory.getLogger(HTable.class);
private static final Consistency DEFAULT_CONSISTENCY = Consistency.STRONG;
private final ConnectionImplementation connection;
@@ -223,17 +221,6 @@ public class HTable implements Table {
return this.connection;
}
- @Override
- @Deprecated
- public HTableDescriptor getTableDescriptor() throws IOException {
- HTableDescriptor htd = HBaseAdmin.getHTableDescriptor(tableName, connection, rpcCallerFactory,
- rpcControllerFactory, operationTimeoutMs, readRpcTimeoutMs);
- if (htd != null) {
- return new ImmutableHTableDescriptor(htd);
- }
- return null;
- }
-
@Override
public TableDescriptor getDescriptor() throws IOException {
return HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
@@ -668,29 +655,6 @@ public class HTable implements Table {
callWithRetries(callable, this.operationTimeoutMs);
}
- @Override
- @Deprecated
- public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
- final byte [] value, final Put put) throws IOException {
- return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, put);
- }
-
- @Override
- @Deprecated
- public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
- final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
- return doCheckAndPut(row, family, qualifier, compareOp.name(), value, null, put);
- }
-
- @Override
- @Deprecated
- public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
- final CompareOperator op, final byte [] value, final Put put) throws IOException {
- // The name of the operators in CompareOperator are intentionally those of the
- // operators in the filter's CompareOp enum.
- return doCheckAndPut(row, family, qualifier, op.name(), value, null, put);
- }
-
private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
final String opName, final byte[] value, final TimeRange timeRange, final Put put)
throws IOException {
@@ -711,28 +675,6 @@ public class HTable implements Table {
.callWithRetries(callable, this.operationTimeoutMs);
}
- @Override
- @Deprecated
- public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
- final byte[] value, final Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, null,
- delete);
- }
-
- @Override
- @Deprecated
- public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
- final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, null, delete);
- }
-
- @Override
- @Deprecated
- public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
- final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, op.name(), value, null, delete);
- }
-
private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final String opName, final byte[] value, final TimeRange timeRange, final Delete delete)
throws IOException {
@@ -824,21 +766,6 @@ public class HTable implements Table {
return ((Result)results[0]).getExists();
}
- @Override
- @Deprecated
- public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
- final CompareOp compareOp, final byte [] value, final RowMutations rm)
- throws IOException {
- return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, null, rm);
- }
-
- @Override
- @Deprecated
- public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
- final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
- return doCheckAndMutate(row, family, qualifier, op.name(), value, null, rm);
- }
-
@Override
public boolean exists(final Get get) throws IOException {
Result r = get(get, true);
@@ -956,23 +883,6 @@ public class HTable implements Table {
return new RegionCoprocessorRpcChannel(connection, tableName, row);
}
- @Override
- public Map coprocessorService(final Class service,
- byte[] startKey, byte[] endKey, final Batch.Call callable)
- throws ServiceException, Throwable {
- final Map results = Collections.synchronizedMap(
- new TreeMap(Bytes.BYTES_COMPARATOR));
- coprocessorService(service, startKey, endKey, callable, new Batch.Callback() {
- @Override
- public void update(byte[] region, byte[] row, R value) {
- if (region != null) {
- results.put(region, value);
- }
- }
- });
- return results;
- }
-
@Override
public void coprocessorService(final Class service,
byte[] startKey, byte[] endKey, final Batch.Call callable,
@@ -1028,93 +938,26 @@ public class HTable implements Table {
return unit.convert(rpcTimeoutMs, TimeUnit.MILLISECONDS);
}
- @Override
- @Deprecated
- public int getRpcTimeout() {
- return rpcTimeoutMs;
- }
-
- @Override
- @Deprecated
- public void setRpcTimeout(int rpcTimeout) {
- setReadRpcTimeout(rpcTimeout);
- setWriteRpcTimeout(rpcTimeout);
- }
-
@Override
public long getReadRpcTimeout(TimeUnit unit) {
return unit.convert(readRpcTimeoutMs, TimeUnit.MILLISECONDS);
}
- @Override
- @Deprecated
- public int getReadRpcTimeout() {
- return readRpcTimeoutMs;
- }
-
- @Override
- @Deprecated
- public void setReadRpcTimeout(int readRpcTimeout) {
- this.readRpcTimeoutMs = readRpcTimeout;
- }
-
@Override
public long getWriteRpcTimeout(TimeUnit unit) {
return unit.convert(writeRpcTimeoutMs, TimeUnit.MILLISECONDS);
}
- @Override
- @Deprecated
- public int getWriteRpcTimeout() {
- return writeRpcTimeoutMs;
- }
-
- @Override
- @Deprecated
- public void setWriteRpcTimeout(int writeRpcTimeout) {
- this.writeRpcTimeoutMs = writeRpcTimeout;
- }
-
@Override
public long getOperationTimeout(TimeUnit unit) {
return unit.convert(operationTimeoutMs, TimeUnit.MILLISECONDS);
}
- @Override
- @Deprecated
- public int getOperationTimeout() {
- return operationTimeoutMs;
- }
-
- @Override
- @Deprecated
- public void setOperationTimeout(int operationTimeout) {
- this.operationTimeoutMs = operationTimeout;
- }
-
@Override
public String toString() {
return tableName + ";" + connection;
}
- @Override
- public Map batchCoprocessorService(
- Descriptors.MethodDescriptor methodDescriptor, Message request,
- byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
- final Map results = Collections.synchronizedMap(new TreeMap(
- Bytes.BYTES_COMPARATOR));
- batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
- new Callback() {
- @Override
- public void update(byte[] region, byte[] row, R result) {
- if (region != null) {
- results.put(region, result);
- }
- }
- });
- return results;
- }
-
@Override
public void batchCoprocessorService(
final Descriptors.MethodDescriptor methodDescriptor, final Message request,
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
index 94e7d9a019..138c87fe18 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java
@@ -57,6 +57,8 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
private final long operationTimeoutNs;
+ private byte[] lastRegion;
+
RegionCoprocessorRpcChannelImpl(AsyncConnectionImpl conn, TableName tableName, RegionInfo region,
byte[] row, long rpcTimeoutNs, long operationTimeoutNs) {
this.conn = conn;
@@ -71,15 +73,13 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
Message responsePrototype, HBaseRpcController controller, HRegionLocation loc,
ClientService.Interface stub) {
CompletableFuture future = new CompletableFuture<>();
- if (region != null
- && !Bytes.equals(loc.getRegionInfo().getRegionName(), region.getRegionName())) {
- future.completeExceptionally(new DoNotRetryIOException(
- "Region name is changed, expected " + region.getRegionNameAsString() + ", actual "
- + loc.getRegionInfo().getRegionNameAsString()));
+ if (region != null && !Bytes.equals(loc.getRegion().getRegionName(), region.getRegionName())) {
+ future.completeExceptionally(new DoNotRetryIOException("Region name is changed, expected " +
+ region.getRegionNameAsString() + ", actual " + loc.getRegion().getRegionNameAsString()));
return future;
}
CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method,
- request, row, loc.getRegionInfo().getRegionName());
+ request, row, loc.getRegion().getRegionName());
stub.execService(controller, csr,
new org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback() {
@@ -88,6 +88,7 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
+ lastRegion = resp.getRegion().getValue().toByteArray();
try {
future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
} catch (IOException e) {
@@ -114,4 +115,8 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
done.run(r);
});
}
+
+ public byte[] getLastRegion() {
+ return lastRegion;
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index 6908424781..5da179dc99 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -18,30 +18,28 @@
*/
package org.apache.hadoop.hbase.client;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.yetus.audience.InterfaceAudience;
-
import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.util.Bytes;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
+import org.apache.yetus.audience.InterfaceAudience;
/**
* Used to communicate with a single HBase table.
@@ -69,23 +67,6 @@ public interface Table extends Closeable {
*/
Configuration getConfiguration();
- /**
- * Gets the {@link org.apache.hadoop.hbase.HTableDescriptor table descriptor} for this table.
- * @throws java.io.IOException if a remote or network exception occurs.
- * @deprecated since 2.0 version and will be removed in 3.0 version.
- * use {@link #getDescriptor()}
- */
- @Deprecated
- default HTableDescriptor getTableDescriptor() throws IOException {
- TableDescriptor descriptor = getDescriptor();
-
- if (descriptor instanceof HTableDescriptor) {
- return (HTableDescriptor)descriptor;
- } else {
- return new HTableDescriptor(descriptor);
- }
- }
-
/**
* Gets the {@link org.apache.hadoop.hbase.client.TableDescriptor table descriptor} for this table.
* @throws java.io.IOException if a remote or network exception occurs.
@@ -129,24 +110,6 @@ public interface Table extends Closeable {
throw new NotImplementedException("Add an implementation!");
}
- /**
- * Test for the existence of columns in the table, as specified by the Gets.
- * This will return an array of booleans. Each value will be true if the related Get matches
- * one or more keys, false if not.
- * This is a server-side call so it prevents any data from being transferred to
- * the client.
- *
- * @param gets the Gets
- * @return Array of boolean. True if the specified Get matches one or more keys, false if not.
- * @throws IOException e
- * @deprecated since 2.0 version and will be removed in 3.0 version.
- * use {@link #exists(List)}
- */
- @Deprecated
- default boolean[] existsAll(List gets) throws IOException {
- return exists(gets);
- }
-
/**
* Method that does a batch call on Deletes, Gets, Puts, Increments, Appends, RowMutations.
* The ordering of execution of the actions is not defined. Meaning if you do a Put and a
@@ -169,10 +132,14 @@ public interface Table extends Closeable {
/**
* Same as {@link #batch(List, Object[])}, but with a callback.
* @since 0.96.0
+ * @deprecated Please use the batch related methods in {@link AsyncTable} directly if you want to
+ * use callback. We reuse the callback for coprocessor here, and the problem is that
+ * for batch operation, the {@link AsyncTable} does not tell us the region, so in this
+ * method we need an extra locating after we get the result, which is not good.
*/
- default void batchCallback(
- final List extends Row> actions, final Object[] results, final Batch.Callback callback)
- throws IOException, InterruptedException {
+ @Deprecated
+ default void batchCallback(final List extends Row> actions, final Object[] results,
+ final Batch.Callback callback) throws IOException, InterruptedException {
throw new NotImplementedException("Add an implementation!");
}
@@ -282,84 +249,6 @@ public interface Table extends Closeable {
throw new NotImplementedException("Add an implementation!");
}
- /**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the put. If the passed value is null, the check
- * is for the lack of column (ie: non-existance)
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param value the expected value
- * @param put data to put if check succeeds
- * @throws IOException e
- * @return true if the new put was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put)
- throws IOException {
- return checkAndPut(row, family, qualifier, CompareOperator.EQUAL, value, put);
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the put. If the passed value is null, the check
- * is for the lack of column (ie: non-existence)
- *
- * The expected value argument of this call is on the left and the current
- * value of the cell is on the right side of the comparison operator.
- *
- * Ie. eg. GREATER operator means expected value > existing <=> add the put.
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param compareOp comparison operator to use
- * @param value the expected value
- * @param put data to put if check succeeds
- * @throws IOException e
- * @return true if the new put was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
- CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException {
- RowMutations mutations = new RowMutations(put.getRow(), 1);
- mutations.add(put);
-
- return checkAndMutate(row, family, qualifier, compareOp, value, mutations);
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the put. If the passed value is null, the check
- * is for the lack of column (ie: non-existence)
- *
- * The expected value argument of this call is on the left and the current
- * value of the cell is on the right side of the comparison operator.
- *
- * Ie. eg. GREATER operator means expected value > existing <=> add the put.
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param op comparison operator to use
- * @param value the expected value
- * @param put data to put if check succeeds
- * @throws IOException e
- * @return true if the new put was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
- byte[] value, Put put) throws IOException {
- RowMutations mutations = new RowMutations(put.getRow(), 1);
- mutations.add(put);
-
- return checkAndMutate(row, family, qualifier, op, value, mutations);
- }
-
/**
* Deletes the specified cells/row.
*
@@ -398,84 +287,6 @@ public interface Table extends Closeable {
throw new NotImplementedException("Add an implementation!");
}
- /**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the delete. If the passed value is null, the
- * check is for the lack of column (ie: non-existance)
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param value the expected value
- * @param delete data to delete if check succeeds
- * @throws IOException e
- * @return true if the new delete was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
- byte[] value, Delete delete) throws IOException {
- return checkAndDelete(row, family, qualifier, CompareOperator.EQUAL, value, delete);
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the delete. If the passed value is null, the
- * check is for the lack of column (ie: non-existence)
- *
- * The expected value argument of this call is on the left and the current
- * value of the cell is on the right side of the comparison operator.
- *
- * Ie. eg. GREATER operator means expected value > existing <=> add the delete.
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param compareOp comparison operator to use
- * @param value the expected value
- * @param delete data to delete if check succeeds
- * @throws IOException e
- * @return true if the new delete was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
- CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException {
- RowMutations mutations = new RowMutations(delete.getRow(), 1);
- mutations.add(delete);
-
- return checkAndMutate(row, family, qualifier, compareOp, value, mutations);
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected
- * value. If it does, it adds the delete. If the passed value is null, the
- * check is for the lack of column (ie: non-existence)
- *
- * The expected value argument of this call is on the left and the current
- * value of the cell is on the right side of the comparison operator.
- *
- * Ie. eg. GREATER operator means expected value > existing <=> add the delete.
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param op comparison operator to use
- * @param value the expected value
- * @param delete data to delete if check succeeds
- * @throws IOException e
- * @return true if the new delete was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
- CompareOperator op, byte[] value, Delete delete) throws IOException {
- RowMutations mutations = new RowMutations(delete.getRow(), 1);
- mutations.add(delete);
-
- return checkAndMutate(row, family, qualifier, op, value, mutations);
- }
-
/**
* Atomically checks if a row/family/qualifier value matches the expected value. If it does, it
* adds the Put/Delete/RowMutations.
@@ -643,32 +454,35 @@ public interface Table extends Closeable {
}
/**
- * Creates and returns a {@link com.google.protobuf.RpcChannel} instance connected to the
- * table region containing the specified row. The row given does not actually have
- * to exist. Whichever region would contain the row based on start and end keys will
- * be used. Note that the {@code row} parameter is also not passed to the
- * coprocessor handler registered for this protocol, unless the {@code row}
- * is separately passed as an argument in the service request. The parameter
- * here is only used to locate the region used to handle the call.
- *
+ * Creates and returns a {@link com.google.protobuf.RpcChannel} instance connected to the table
+ * region containing the specified row. The row given does not actually have to exist. Whichever
+ * region would contain the row based on start and end keys will be used. Note that the
+ * {@code row} parameter is also not passed to the coprocessor handler registered for this
+ * protocol, unless the {@code row} is separately passed as an argument in the service request.
+ * The parameter here is only used to locate the region used to handle the call.
*
* The obtained {@link com.google.protobuf.RpcChannel} instance can be used to access a published
* coprocessor {@link com.google.protobuf.Service} using standard protobuf service invocations:
*
* @param row The row key used to identify the remote region location
* @return A CoprocessorRpcChannel instance
+ * @deprecated This is too low level, please stop using it any more. Use the coprocessorService
+ * methods in {@link AsyncTable} instead.
+ * @see Connection#toAsyncConnection()
*/
+ @Deprecated
default CoprocessorRpcChannel coprocessorService(byte[] row) {
throw new NotImplementedException("Add an implementation!");
}
@@ -678,25 +492,40 @@ public interface Table extends Closeable {
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), and
* invokes the passed {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
* with each {@link com.google.protobuf.Service} instance.
- *
* @param service the protocol buffer {@code Service} implementation to call
- * @param startKey start region selection with region containing this row. If {@code null}, the
- * selection will start with the first table region.
+ * @param startKey start region selection with region containing this row. If {@code null}, the
+ * selection will start with the first table region.
* @param endKey select regions up to and including the region containing this row. If
- * {@code null}, selection will continue through the last table region.
+ * {@code null}, selection will continue through the last table region.
* @param callable this instance's
- * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call}
- * method will be invoked once per table region, using the {@link com.google.protobuf.Service}
- * instance connected to that region.
+ * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method will be
+ * invoked once per table region, using the {@link com.google.protobuf.Service} instance
+ * connected to that region.
* @param the {@link com.google.protobuf.Service} subclass to connect to
- * @param Return type for the {@code callable} parameter's {@link
- * org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
+ * @param Return type for the {@code callable} parameter's
+ * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
* @return a map of result values keyed by region name
+ * @deprecated The batch call here references the blocking interface for of a protobuf stub, so it
+ * is not possible to do it in an asynchronous way, even if now we are building the
+ * {@link Table} implementation based on the {@link AsyncTable}, which is not good.
+ * Use the coprocessorService methods in {@link AsyncTable} directly instead.
+ * @see Connection#toAsyncConnection()
*/
- default Map coprocessorService(final Class service,
- byte[] startKey, byte[] endKey, final Batch.Call callable)
- throws ServiceException, Throwable {
- throw new NotImplementedException("Add an implementation!");
+ @Deprecated
+ default Map coprocessorService(final Class service,
+ byte[] startKey, byte[] endKey, final Batch.Call callable)
+ throws ServiceException, Throwable {
+ Map results =
+ Collections.synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR));
+ coprocessorService(service, startKey, endKey, callable, new Batch.Callback() {
+ @Override
+ public void update(byte[] region, byte[] row, R value) {
+ if (region != null) {
+ results.put(region, value);
+ }
+ }
+ });
+ return results;
}
/**
@@ -704,28 +533,34 @@ public interface Table extends Closeable {
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), and
* invokes the passed {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
* with each {@link Service} instance.
- *
- *
The given
+ *
+ * The given
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[],byte[],Object)}
* method will be called with the return value from each region's
- * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} invocation.
* @param service the protocol buffer {@code Service} implementation to call
- * @param startKey start region selection with region containing this row. If {@code null}, the
- * selection will start with the first table region.
+ * @param startKey start region selection with region containing this row. If {@code null}, the
+ * selection will start with the first table region.
* @param endKey select regions up to and including the region containing this row. If
- * {@code null}, selection will continue through the last table region.
+ * {@code null}, selection will continue through the last table region.
* @param callable this instance's
- * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call}
- * method will be invoked once per table region, using the {@link Service} instance connected to
- * that region.
+ * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method will be
+ * invoked once per table region, using the {@link Service} instance connected to that
+ * region.
* @param the {@link Service} subclass to connect to
- * @param Return type for the {@code callable} parameter's {@link
- * org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
+ * @param Return type for the {@code callable} parameter's
+ * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
+ * @deprecated The batch call here references the blocking interface for of a protobuf stub, so it
+ * is not possible to do it in an asynchronous way, even if now we are building the
+ * {@link Table} implementation based on the {@link AsyncTable}, which is not good.
+ * Use the coprocessorService methods in {@link AsyncTable} directly instead.
+ * @see Connection#toAsyncConnection()
*/
- default void coprocessorService(final Class service,
- byte[] startKey, byte[] endKey, final Batch.Call callable,
- final Batch.Callback callback) throws ServiceException, Throwable {
+ @Deprecated
+ default void coprocessorService(final Class service, byte[] startKey,
+ byte[] endKey, final Batch.Call callable, final Batch.Callback callback)
+ throws ServiceException, Throwable {
throw new NotImplementedException("Add an implementation!");
}
@@ -734,27 +569,37 @@ public interface Table extends Closeable {
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all
* the invocations to the same region server will be batched into one call. The coprocessor
* service is invoked according to the service instance, method name and parameters.
- *
- * @param methodDescriptor
- * the descriptor for the protobuf service method to call.
- * @param request
- * the method call parameters
- * @param startKey
- * start region selection with region containing this row. If {@code null}, the
+ * @param methodDescriptor the descriptor for the protobuf service method to call.
+ * @param request the method call parameters
+ * @param startKey start region selection with region containing this row. If {@code null}, the
* selection will start with the first table region.
- * @param endKey
- * select regions up to and including the region containing this row. If {@code null},
- * selection will continue through the last table region.
- * @param responsePrototype
- * the proto type of the response of the method in Service.
- * @param
- * the response type for the coprocessor Service method
+ * @param endKey select regions up to and including the region containing this row. If
+ * {@code null}, selection will continue through the last table region.
+ * @param responsePrototype the proto type of the response of the method in Service.
+ * @param the response type for the coprocessor Service method
* @return a map of result values keyed by region name
+ * @deprecated The batch call here references the blocking interface for of a protobuf stub, so it
+ * is not possible to do it in an asynchronous way, even if now we are building the
+ * {@link Table} implementation based on the {@link AsyncTable}, which is not good.
+ * Use the coprocessorService methods in {@link AsyncTable} directly instead.
+ * @see Connection#toAsyncConnection()
*/
+ @Deprecated
default Map batchCoprocessorService(
- Descriptors.MethodDescriptor methodDescriptor, Message request,
- byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
- throw new NotImplementedException("Add an implementation!");
+ Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey,
+ byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
+ final Map results =
+ Collections.synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR));
+ batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
+ new Callback() {
+ @Override
+ public void update(byte[] region, byte[] row, R result) {
+ if (region != null) {
+ results.put(region, result);
+ }
+ }
+ });
+ return results;
}
/**
@@ -762,24 +607,27 @@ public interface Table extends Closeable {
* region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all
* the invocations to the same region server will be batched into one call. The coprocessor
* service is invoked according to the service instance, method name and parameters.
- *
*
* The given
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[],byte[],Object)}
* method will be called with the return value from each region's invocation.
*
- *
* @param methodDescriptor the descriptor for the protobuf service method to call.
* @param request the method call parameters
- * @param startKey start region selection with region containing this row.
- * If {@code null}, the selection will start with the first table region.
- * @param endKey select regions up to and including the region containing this row.
- * If {@code null}, selection will continue through the last table region.
+ * @param startKey start region selection with region containing this row. If {@code null}, the
+ * selection will start with the first table region.
+ * @param endKey select regions up to and including the region containing this row. If
+ * {@code null}, selection will continue through the last table region.
* @param responsePrototype the proto type of the response of the method in Service.
* @param callback callback to invoke with the response for each region
- * @param
- * the response type for the coprocessor Service method
+ * @param the response type for the coprocessor Service method
+ * @deprecated The batch call here references the blocking interface for of a protobuf stub, so it
+ * is not possible to do it in an asynchronous way, even if now we are building the
+ * {@link Table} implementation based on the {@link AsyncTable}, which is not good.
+ * Use the coprocessorService methods in {@link AsyncTable} directly instead.
+ * @see Connection#toAsyncConnection()
*/
+ @Deprecated
default void batchCoprocessorService(
Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey,
byte[] endKey, R responsePrototype, Batch.Callback callback)
@@ -787,58 +635,6 @@ public interface Table extends Closeable {
throw new NotImplementedException("Add an implementation!");
}
- /**
- * Atomically checks if a row/family/qualifier value matches the expected value.
- * If it does, it performs the row mutations. If the passed value is null, the check
- * is for the lack of column (ie: non-existence)
- *
- * The expected value argument of this call is on the left and the current
- * value of the cell is on the right side of the comparison operator.
- *
- * Ie. eg. GREATER operator means expected value > existing <=> perform row mutations.
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param compareOp the comparison operator
- * @param value the expected value
- * @param mutation mutations to perform if check succeeds
- * @throws IOException e
- * @return true if the new put was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
- CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
- throw new NotImplementedException("Add an implementation!");
- }
-
- /**
- * Atomically checks if a row/family/qualifier value matches the expected value.
- * If it does, it performs the row mutations. If the passed value is null, the check
- * is for the lack of column (ie: non-existence)
- *
- * The expected value argument of this call is on the left and the current
- * value of the cell is on the right side of the comparison operator.
- *
- * Ie. eg. GREATER operator means expected value > existing <=> perform row mutations.
- *
- * @param row to check
- * @param family column family to check
- * @param qualifier column qualifier to check
- * @param op the comparison operator
- * @param value the expected value
- * @param mutation mutations to perform if check succeeds
- * @throws IOException e
- * @return true if the new put was executed, false otherwise
- * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use {@link #checkAndMutate(byte[], byte[])}
- */
- @Deprecated
- default boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
- byte[] value, RowMutations mutation) throws IOException {
- throw new NotImplementedException("Add an implementation!");
- }
-
/**
* Get timeout of each rpc request in this Table instance. It will be overridden by a more
* specific rpc timeout config such as readRpcTimeout or writeRpcTimeout.
@@ -851,36 +647,6 @@ public interface Table extends Closeable {
throw new NotImplementedException("Add an implementation!");
}
- /**
- * Get timeout (millisecond) of each rpc request in this Table instance.
- *
- * @return Currently configured read timeout
- * @deprecated use {@link #getReadRpcTimeout(TimeUnit)} or
- * {@link #getWriteRpcTimeout(TimeUnit)} instead
- */
- @Deprecated
- default int getRpcTimeout() {
- return (int)getRpcTimeout(TimeUnit.MILLISECONDS);
- }
-
- /**
- * Set timeout (millisecond) of each rpc request in operations of this Table instance, will
- * override the value of hbase.rpc.timeout in configuration.
- * If a rpc request waiting too long, it will stop waiting and send a new request to retry until
- * retries exhausted or operation timeout reached.
- *
- * NOTE: This will set both the read and write timeout settings to the provided value.
- *
- * @param rpcTimeout the timeout of each rpc request in millisecond.
- *
- * @deprecated Use setReadRpcTimeout or setWriteRpcTimeout instead
- */
- @Deprecated
- default void setRpcTimeout(int rpcTimeout) {
- setReadRpcTimeout(rpcTimeout);
- setWriteRpcTimeout(rpcTimeout);
- }
-
/**
* Get timeout of each rpc read request in this Table instance.
* @param unit the unit of time the timeout to be represented in
@@ -890,30 +656,6 @@ public interface Table extends Closeable {
throw new NotImplementedException("Add an implementation!");
}
- /**
- * Get timeout (millisecond) of each rpc read request in this Table instance.
- * @deprecated since 2.0 and will be removed in 3.0 version
- * use {@link #getReadRpcTimeout(TimeUnit)} instead
- */
- @Deprecated
- default int getReadRpcTimeout() {
- return (int)getReadRpcTimeout(TimeUnit.MILLISECONDS);
- }
-
- /**
- * Set timeout (millisecond) of each rpc read request in operations of this Table instance, will
- * override the value of hbase.rpc.read.timeout in configuration.
- * If a rpc read request waiting too long, it will stop waiting and send a new request to retry
- * until retries exhausted or operation timeout reached.
- *
- * @param readRpcTimeout the timeout for read rpc request in milliseconds
- * @deprecated since 2.0.0, use {@link TableBuilder#setReadRpcTimeout} instead
- */
- @Deprecated
- default void setReadRpcTimeout(int readRpcTimeout) {
- throw new NotImplementedException("Add an implementation!");
- }
-
/**
* Get timeout of each rpc write request in this Table instance.
* @param unit the unit of time the timeout to be represented in
@@ -923,30 +665,6 @@ public interface Table extends Closeable {
throw new NotImplementedException("Add an implementation!");
}
- /**
- * Get timeout (millisecond) of each rpc write request in this Table instance.
- * @deprecated since 2.0 and will be removed in 3.0 version
- * use {@link #getWriteRpcTimeout(TimeUnit)} instead
- */
- @Deprecated
- default int getWriteRpcTimeout() {
- return (int)getWriteRpcTimeout(TimeUnit.MILLISECONDS);
- }
-
- /**
- * Set timeout (millisecond) of each rpc write request in operations of this Table instance, will
- * override the value of hbase.rpc.write.timeout in configuration.
- * If a rpc write request waiting too long, it will stop waiting and send a new request to retry
- * until retries exhausted or operation timeout reached.
- *
- * @param writeRpcTimeout the timeout for write rpc request in milliseconds
- * @deprecated since 2.0.0, use {@link TableBuilder#setWriteRpcTimeout} instead
- */
- @Deprecated
- default void setWriteRpcTimeout(int writeRpcTimeout) {
- throw new NotImplementedException("Add an implementation!");
- }
-
/**
* Get timeout of each operation in Table instance.
* @param unit the unit of time the timeout to be represented in
@@ -955,30 +673,4 @@ public interface Table extends Closeable {
default long getOperationTimeout(TimeUnit unit) {
throw new NotImplementedException("Add an implementation!");
}
-
- /**
- * Get timeout (millisecond) of each operation for in Table instance.
- * @deprecated since 2.0 and will be removed in 3.0 version
- * use {@link #getOperationTimeout(TimeUnit)} instead
- */
- @Deprecated
- default int getOperationTimeout() {
- return (int)getOperationTimeout(TimeUnit.MILLISECONDS);
- }
-
- /**
- * Set timeout (millisecond) of each operation in this Table instance, will override the value
- * of hbase.client.operation.timeout in configuration.
- * Operation timeout is a top-level restriction that makes sure a blocking method will not be
- * blocked more than this. In each operation, if rpc request fails because of timeout or
- * other reason, it will retry until success or throw a RetriesExhaustedException. But if the
- * total time being blocking reach the operation timeout before retries exhausted, it will break
- * early and throw SocketTimeoutException.
- * @param operationTimeout the total timeout of each operation in millisecond.
- * @deprecated since 2.0.0, use {@link TableBuilder#setOperationTimeout} instead
- */
- @Deprecated
- default void setOperationTimeout(int operationTimeout) {
- throw new NotImplementedException("Add an implementation!");
- }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
new file mode 100644
index 0000000000..790264a04e
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
@@ -0,0 +1,489 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.primitives.Booleans;
+
+/**
+ * The table implementation based on {@link AsyncTable}.
+ */
+@InterfaceAudience.Private
+class TableOverAsyncTable implements Table {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TableOverAsyncTable.class);
+
+ private final AsyncConnectionImpl conn;
+
+ private final AsyncTable> table;
+
+ private final ExecutorService pool;
+
+ TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable> table, ExecutorService pool) {
+ this.conn = conn;
+ this.table = table;
+ this.pool = pool;
+ }
+
+ @Override
+ public TableName getName() {
+ return table.getName();
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return table.getConfiguration();
+ }
+
+ @Override
+ public TableDescriptor getDescriptor() throws IOException {
+ return FutureUtils.get(conn.getAdmin().getDescriptor(getName()));
+ }
+
+ @Override
+ public boolean exists(Get get) throws IOException {
+ return FutureUtils.get(table.exists(get));
+ }
+
+ @Override
+ public boolean[] exists(List gets) throws IOException {
+ return Booleans.toArray(FutureUtils.get(table.existsAll(gets)));
+ }
+
+ @Override
+ public void batch(List extends Row> actions, Object[] results)
+ throws IOException, InterruptedException {
+ List