From 7b04466dc8acc910df67b1cf9c3e74004ac51a05 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Mon, 25 Aug 2014 15:14:23 -0700 Subject: [PATCH] HBASE-11766 Backdoor CoprocessorHConnection is no longer being used for local writes --- .../hadoop/hbase/client/HConnectionManager.java | 2 +- .../hbase/client/CoprocessorHConnection.java | 328 +++------------------ 2 files changed, 43 insertions(+), 287 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index d76ae2a..65a41eb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -564,7 +564,7 @@ public class HConnectionManager { @edu.umd.cs.findbugs.annotations.SuppressWarnings( value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION", justification="Access to the conncurrent hash map is under a lock so should be fine.") - static class HConnectionImplementation implements HConnection, Closeable { + public static class HConnectionImplementation implements HConnection, Closeable { static final Log LOG = LogFactory.getLog(HConnectionImplementation.class); private final long pause; private final int numTries; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java index c3e3237..789db5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java @@ -18,24 +18,13 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; -import java.util.List; -import java.util.concurrent.ExecutorService; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; @@ -43,6 +32,7 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import com.google.protobuf.BlockingRpcChannel; @@ -62,7 +52,7 @@ import com.google.protobuf.ServiceException; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class CoprocessorHConnection implements HConnection { +public class CoprocessorHConnection extends HConnectionImplementation { /** * Create an unmanaged {@link HConnection} based on the environment in which we are running the @@ -70,50 +60,74 @@ public class CoprocessorHConnection implements HConnection { * cleanup mechanisms since we own everything). * @param env environment hosting the {@link HConnection} * @return an unmanaged {@link HConnection}. - * @throws IOException if we cannot create the basic connection + * @throws IOException if we cannot create the connection */ public static HConnection getConnectionForEnvironment(CoprocessorEnvironment env) throws IOException { - HConnection connection = HConnectionManager.createConnection(env.getConfiguration()); // this bit is a little hacky - just trying to get it going for the moment if (env instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env; RegionServerServices services = e.getRegionServerServices(); if (services instanceof HRegionServer) { - return new CoprocessorHConnection(connection, (HRegionServer) services); + return new CoprocessorHConnection((HRegionServer) services); } } - return connection; + return HConnectionManager.createConnection(env.getConfiguration()); } - private HConnection delegate; - private ServerName serverName; - private HRegionServer server; + private final ServerName serverName; + private final HRegionServer server; - public CoprocessorHConnection(HConnection delegate, HRegionServer server) { + /** + * Legacy constructor + * @param delegate + * @param server + * @throws IOException if we cannot create the connection + * @deprecated delegate is not used + */ + @Deprecated + public CoprocessorHConnection(HConnection delegate, HRegionServer server) + throws IOException { + this(server); + } + + /** + * Constructor that uses server configuration + * @param server + * @throws IOException if we cannot create the connection + */ + public CoprocessorHConnection(HRegionServer server) throws IOException { + this(server.getConfiguration(), server); + } + + /** + * Constructor that accepts custom configuration + * @param conf + * @param server + * @throws IOException if we cannot create the connection + */ + public CoprocessorHConnection(Configuration conf, HRegionServer server) throws IOException { + super(conf, false, null, UserProvider.instantiate(conf).getCurrent()); this.server = server; this.serverName = server.getServerName(); - this.delegate = delegate; } + @Override public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface getClient(ServerName serverName) throws IOException { // client is trying to reach off-server, so we can't do anything special if (!this.serverName.equals(serverName)) { - return delegate.getClient(serverName); + return super.getClient(serverName); } // the client is attempting to write to the same regionserver, we can short-circuit to our // local regionserver final BlockingService blocking = ClientService.newReflectiveBlockingService(this.server); final RpcServerInterface rpc = this.server.getRpcServer(); - - final MonitoredRPCHandler status = - TaskMonitor.get().createRPCStatus(Thread.currentThread().getName()); + final MonitoredRPCHandler status = TaskMonitor.get().createRPCStatus(Thread.currentThread() + .getName()); status.pause("Setting up server-local call"); - final long timestamp = EnvironmentEdgeManager.currentTimeMillis(); BlockingRpcChannel channel = new BlockingRpcChannel() { - @Override public Message callBlockingMethod(MethodDescriptor method, RpcController controller, Message request, Message responsePrototype) throws ServiceException { @@ -127,262 +141,4 @@ public class CoprocessorHConnection implements HConnection { }; return ClientService.newBlockingStub(channel); } - - public void abort(String why, Throwable e) { - delegate.abort(why, e); - } - - public boolean isAborted() { - return delegate.isAborted(); - } - - public Configuration getConfiguration() { - return delegate.getConfiguration(); - } - - public HTableInterface getTable(String tableName) throws IOException { - return delegate.getTable(tableName); - } - - public HTableInterface getTable(byte[] tableName) throws IOException { - return delegate.getTable(tableName); - } - - public HTableInterface getTable(TableName tableName) throws IOException { - return delegate.getTable(tableName); - } - - public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException { - return delegate.getTable(tableName, pool); - } - - public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException { - return delegate.getTable(tableName, pool); - } - - public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException { - return delegate.getTable(tableName, pool); - } - - public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException { - return delegate.isMasterRunning(); - } - - public boolean isTableEnabled(TableName tableName) throws IOException { - return delegate.isTableEnabled(tableName); - } - - public boolean isTableEnabled(byte[] tableName) throws IOException { - return delegate.isTableEnabled(tableName); - } - - public boolean isTableDisabled(TableName tableName) throws IOException { - return delegate.isTableDisabled(tableName); - } - - public boolean isTableDisabled(byte[] tableName) throws IOException { - return delegate.isTableDisabled(tableName); - } - - public boolean isTableAvailable(TableName tableName) throws IOException { - return delegate.isTableAvailable(tableName); - } - - public boolean isTableAvailable(byte[] tableName) throws IOException { - return delegate.isTableAvailable(tableName); - } - - public boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException { - return delegate.isTableAvailable(tableName, splitKeys); - } - - public boolean isTableAvailable(byte[] tableName, byte[][] splitKeys) throws IOException { - return delegate.isTableAvailable(tableName, splitKeys); - } - - public HTableDescriptor[] listTables() throws IOException { - return delegate.listTables(); - } - - public String[] getTableNames() throws IOException { - return delegate.getTableNames(); - } - - public TableName[] listTableNames() throws IOException { - return delegate.listTableNames(); - } - - public HTableDescriptor getHTableDescriptor(TableName tableName) throws IOException { - return delegate.getHTableDescriptor(tableName); - } - - public HTableDescriptor getHTableDescriptor(byte[] tableName) throws IOException { - return delegate.getHTableDescriptor(tableName); - } - - public HRegionLocation locateRegion(TableName tableName, byte[] row) throws IOException { - return delegate.locateRegion(tableName, row); - } - - public HRegionLocation locateRegion(byte[] tableName, byte[] row) throws IOException { - return delegate.locateRegion(tableName, row); - } - - public void clearRegionCache() { - delegate.clearRegionCache(); - } - - public void clearRegionCache(TableName tableName) { - delegate.clearRegionCache(tableName); - } - - public void clearRegionCache(byte[] tableName) { - delegate.clearRegionCache(tableName); - } - - public HRegionLocation relocateRegion(TableName tableName, byte[] row) throws IOException { - return delegate.relocateRegion(tableName, row); - } - - public HRegionLocation relocateRegion(byte[] tableName, byte[] row) throws IOException { - return delegate.relocateRegion(tableName, row); - } - - public void updateCachedLocations(TableName tableName, byte[] rowkey, Object exception, - HRegionLocation source) { - delegate.updateCachedLocations(tableName, rowkey, exception, source); - } - - public void updateCachedLocations(byte[] tableName, byte[] rowkey, Object exception, - HRegionLocation source) { - delegate.updateCachedLocations(tableName, rowkey, exception, source); - } - - public HRegionLocation locateRegion(byte[] regionName) throws IOException { - return delegate.locateRegion(regionName); - } - - public List locateRegions(TableName tableName) throws IOException { - return delegate.locateRegions(tableName); - } - - public List locateRegions(byte[] tableName) throws IOException { - return delegate.locateRegions(tableName); - } - - public List - locateRegions(TableName tableName, boolean useCache, boolean offlined) throws IOException { - return delegate.locateRegions(tableName, useCache, offlined); - } - - public List locateRegions(byte[] tableName, boolean useCache, boolean offlined) - throws IOException { - return delegate.locateRegions(tableName, useCache, offlined); - } - - public org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService.BlockingInterface getMaster() - throws IOException { - return delegate.getMaster(); - } - - public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface - getAdmin(ServerName serverName) throws IOException { - return delegate.getAdmin(serverName); - } - - public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface - getAdmin(ServerName serverName, boolean getMaster) throws IOException { - return delegate.getAdmin(serverName, getMaster); - } - - public HRegionLocation getRegionLocation(TableName tableName, byte[] row, boolean reload) - throws IOException { - return delegate.getRegionLocation(tableName, row, reload); - } - - public HRegionLocation getRegionLocation(byte[] tableName, byte[] row, boolean reload) - throws IOException { - return delegate.getRegionLocation(tableName, row, reload); - } - - public void processBatch(List actions, TableName tableName, ExecutorService pool, - Object[] results) throws IOException, InterruptedException { - delegate.processBatch(actions, tableName, pool, results); - } - - public void processBatch(List actions, byte[] tableName, ExecutorService pool, - Object[] results) throws IOException, InterruptedException { - delegate.processBatch(actions, tableName, pool, results); - } - - public void processBatchCallback(List list, TableName tableName, - ExecutorService pool, Object[] results, Callback callback) throws IOException, - InterruptedException { - delegate.processBatchCallback(list, tableName, pool, results, callback); - } - - public void processBatchCallback(List list, byte[] tableName, - ExecutorService pool, Object[] results, Callback callback) throws IOException, - InterruptedException { - delegate.processBatchCallback(list, tableName, pool, results, callback); - } - - public void setRegionCachePrefetch(TableName tableName, boolean enable) { - delegate.setRegionCachePrefetch(tableName, enable); - } - - public void setRegionCachePrefetch(byte[] tableName, boolean enable) { - delegate.setRegionCachePrefetch(tableName, enable); - } - - public boolean getRegionCachePrefetch(TableName tableName) { - return delegate.getRegionCachePrefetch(tableName); - } - - public boolean getRegionCachePrefetch(byte[] tableName) { - return delegate.getRegionCachePrefetch(tableName); - } - - public int getCurrentNrHRS() throws IOException { - return delegate.getCurrentNrHRS(); - } - - public HTableDescriptor[] getHTableDescriptorsByTableName(List tableNames) - throws IOException { - return delegate.getHTableDescriptorsByTableName(tableNames); - } - - public HTableDescriptor[] getHTableDescriptors(List tableNames) throws IOException { - return delegate.getHTableDescriptors(tableNames); - } - - public boolean isClosed() { - return delegate.isClosed(); - } - - public void clearCaches(ServerName sn) { - delegate.clearCaches(sn); - } - - public void close() throws IOException { - delegate.close(); - } - - public void deleteCachedRegionLocation(HRegionLocation location) { - delegate.deleteCachedRegionLocation(location); - } - - public MasterKeepAliveConnection getKeepAliveMasterService() - throws MasterNotRunningException { - return delegate.getKeepAliveMasterService(); - } - - public boolean isDeadServer(ServerName serverName) { - return delegate.isDeadServer(serverName); - } - - @Override - public NonceGenerator getNonceGenerator() { - return null; // don't use nonces for coprocessor connection - } } \ No newline at end of file -- 1.8.5.2 (Apple Git-48)