diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 323915b..6ad844b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.lang.reflect.Constructor; import java.util.Random; import java.util.concurrent.ExecutorService; @@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; import com.google.common.annotations.VisibleForTesting; @@ -106,30 +108,37 @@ public static void setServerSideHConnectionRetriesConfig( } /** - * Adapt a HConnection so that it can bypass the RPC layer (serialization, - * deserialization, networking, etc..) -- i.e. short-circuit -- when talking to a local server. - * @param conn the connection to adapt + * Creates an instance of ShortCircuitConnection that can bypass the RPC layer (serialization, + * deserialization, networking, etc..) when talking to a local server. + * @param conf the current configuration * @param serverName the local server name * @param admin the admin interface of the local server * @param client the client interface of the local server - * @return an adapted/decorated HConnection + * @return an instance of ShortCircuitConnection + * @throws IOException */ - public static ClusterConnection createShortCircuitHConnection(final Connection conn, - final ServerName serverName, final AdminService.BlockingInterface admin, - final ClientService.BlockingInterface client) { - return new ConnectionAdapter(conn) { - @Override - public AdminService.BlockingInterface getAdmin( - ServerName sn, boolean getMaster) throws IOException { - return serverName.equals(sn) ? admin : super.getAdmin(sn, getMaster); - } - - @Override - public ClientService.BlockingInterface getClient( - ServerName sn) throws IOException { - return serverName.equals(sn) ? client : super.getClient(sn); - } - }; + public static ClusterConnection createShortCircuitConnection(final Configuration conf, + final ServerName serverName, final AdminService.BlockingInterface admin, + final ClientService.BlockingInterface client) throws IOException { + UserProvider provider = UserProvider.instantiate(conf); + User user = provider.getCurrent(); + Class clazz; + try { + clazz = Class.forName(ShortCircuitConnection.class.getName()); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + try { + // Default HCM#HCI is not accessible; make it so before invoking. + Constructor constructor = clazz.getDeclaredConstructor(Configuration.class, + ExecutorService.class, User.class, ServerName.class, AdminService.BlockingInterface.class, + ClientService.BlockingInterface.class); + constructor.setAccessible(true); + return (ShortCircuitConnection) constructor.newInstance(conf, null, user, serverName, admin, + client); + } catch (Exception e) { + throw new IOException(e); + } } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitConnection.java new file mode 100644 index 0000000..45b372e --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitConnection.java @@ -0,0 +1,65 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.security.User; + +/** + * The short-circuit connection that can bypass the RPC layer (serialization, deserialization, + * networking, etc..) when talking to a local server. It can be used for both local and remote + * invocations. + */ +@InterfaceAudience.Private +public class ShortCircuitConnection extends ConnectionImplementation { + + private ServerName serverName; + private AdminService.BlockingInterface admin; + private ClientService.BlockingInterface client; + + protected ShortCircuitConnection(Configuration conf, ExecutorService pool, User user, + ServerName serverName, AdminService.BlockingInterface admin, + ClientService.BlockingInterface client) throws IOException { + super(conf, pool, user); + this.serverName = serverName; + this.admin = admin; + this.client = client; + } + + @Override + public AdminService.BlockingInterface getAdmin(ServerName sn, boolean getMaster) + throws IOException { + return admin != null && serverName != null && serverName.equals(sn) ? admin : super.getAdmin( + sn, getMaster); + } + + @Override + public ClientService.BlockingInterface getClient(ServerName sn) throws IOException { + return client != null && serverName != null && serverName.equals(sn) ? client : super + .getClient(sn); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 315659a..27da311 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -78,7 +78,6 @@ import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.conf.ConfigurationManager; @@ -675,7 +674,7 @@ public boolean registerService(Service instance) { /** * Create a 'smarter' HConnection, one that is capable of by-passing RPC if the request is to - * the local server. Safe to use going to local or remote server. + * the local server. Safe to use going to local or remote server. * Create this instance in a method can be intercepted and mocked in tests. * @throws IOException */ @@ -684,8 +683,8 @@ protected ClusterConnection createClusterConnection() throws IOException { // Create a cluster connection that when appropriate, can short-circuit and go directly to the // local server if the request is to the local server bypassing RPC. Can be used for both local // and remote invocations. - return ConnectionUtils.createShortCircuitHConnection( - ConnectionFactory.createConnection(conf), serverName, rpcServices, rpcServices); + return ConnectionUtils + .createShortCircuitConnection(conf, serverName, rpcServices, rpcServices); } /**