diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index eb9842f..99da1be 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -21,7 +21,6 @@ import java.util.List; import java.util.concurrent.ExecutorService; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; @@ -47,7 +46,7 @@ * Connections skip RPC if request is to local server. */ @InterfaceAudience.Private -@SuppressWarnings("deprecation") +@Deprecated //NOTE: DO NOT make this class public. It was made package-private on purpose. abstract class ConnectionAdapter implements ClusterConnection { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index d04ae76..647295e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -22,14 +22,15 @@ import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; import com.google.common.annotations.VisibleForTesting; @@ -112,6 +113,7 @@ public static void setServerSideHConnectionRetriesConfig( * @param client the client interface of the local server * @return an adapted/decorated HConnection */ + @Deprecated public static ClusterConnection createShortCircuitHConnection(final Connection conn, final ServerName serverName, final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client) { @@ -131,6 +133,39 @@ public static ClusterConnection createShortCircuitHConnection(final Connection c } /** + * Creates a short-circuit connection that can bypass the RPC layer (serialization, + * deserialization, networking, etc..) when talking to a local server. + * @param conf the current configuration + * @param pool the thread pool to use for batch operations + * @param user the user the connection is for + * @param serverName the local server name + * @param admin the admin interface of the local server + * @param client the client interface of the local server + * @return a short-circuit connection. + * @throws IOException + */ + public static ClusterConnection createShortCircuitConnection(final Configuration conf, + ExecutorService pool, User user, final ServerName serverName, + final AdminService.BlockingInterface admin, final ClientService.BlockingInterface client) + throws IOException { + if (user == null) { + user = UserProvider.instantiate(conf).getCurrent(); + } + return new ConnectionManager.HConnectionImplementation(conf, false, pool, user) { + @Override + public AdminService.BlockingInterface getAdmin(ServerName sn, boolean getMaster) + throws IOException { + return serverName.equals(sn) ? admin : super.getAdmin(sn, getMaster); + } + + @Override + public ClientService.BlockingInterface getClient(ServerName sn) throws IOException { + return serverName.equals(sn) ? client : super.getClient(sn); + } + }; + } + + /** * Setup the connection class, so that it will not depend on master being online. Used for testing * @param conf configuration to set */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index b388111..f8451c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -79,7 +79,6 @@ import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.conf.ConfigurationManager; @@ -665,7 +664,7 @@ public boolean registerService(Service instance) { /** * Create a 'smarter' HConnection, one that is capable of by-passing RPC if the request is to - * the local server. Safe to use going to local or remote server. + * the local server. Safe to use going to local or remote server. * Create this instance in a method can be intercepted and mocked in tests. * @throws IOException */ @@ -674,8 +673,8 @@ protected ClusterConnection createClusterConnection() throws IOException { // Create a cluster connection that when appropriate, can short-circuit and go directly to the // local server if the request is to the local server bypassing RPC. Can be used for both local // and remote invocations. - return ConnectionUtils.createShortCircuitHConnection( - ConnectionFactory.createConnection(conf), serverName, rpcServices, rpcServices); + return ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(), + serverName, rpcServices, rpcServices); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java new file mode 100644 index 0000000..3707865 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestShortCircuitConnection.java @@ -0,0 +1,85 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestShortCircuitConnection { + + private final static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + @SuppressWarnings("deprecation") + public void testShortCircuitConnection() throws IOException, InterruptedException { + String tnAsString = "testShortCircuitConnection"; + TableName tn = TableName.valueOf(tnAsString); + HTableDescriptor htd = UTIL.createTableDescriptor(tnAsString); + HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes("cf")); + htd.addFamily(hcd); + UTIL.createTable(htd, null); + HRegionServer regionServer = UTIL.getRSForFirstRegionInTable(tn); + ClusterConnection connection = regionServer.getConnection(); + HTableInterface tableIf = connection.getTable(tn); + assertTrue(tableIf instanceof HTable); + HTable table = (HTable) tableIf; + assertTrue(table.getConnection() == connection); + AdminService.BlockingInterface admin = connection.getAdmin(regionServer.getServerName()); + ClientService.BlockingInterface client = connection.getClient(regionServer.getServerName()); + assertTrue(admin instanceof RSRpcServices); + assertTrue(client instanceof RSRpcServices); + ServerName anotherSn = ServerName.valueOf(regionServer.getServerName().getHostAndPort(), + EnvironmentEdgeManager.currentTime()); + admin = connection.getAdmin(anotherSn); + client = connection.getClient(anotherSn); + assertFalse(admin instanceof RSRpcServices); + assertFalse(client instanceof RSRpcServices); + assertTrue(connection.getAdmin().getConnection() == connection); + } +}