Index: src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 937144) +++ src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -68,6 +68,7 @@ private MiniDFSCluster dfsCluster = null; private MiniHBaseCluster hbaseCluster = null; private MiniMRCluster mrCluster = null; + // If non-null, then already a cluster running. private File clusterTestBuildDir = null; private HBaseAdmin hbaseAdmin = null; @@ -98,6 +99,36 @@ } /** + * Home our cluster in a dir under build/test. Give it a random name + * so can have many concurrent clusters running if we need to. Need to + * amend the test.build.data System property. Its what minidfscluster bases + * it data dir on. Moding a System property is not the way to do concurrent + * instances -- another instance could grab the temporary + * value unintentionally -- but not anything can do about it at moment; its + * how the minidfscluster works. + * @return The calculated cluster test build directory. + */ + File setupClusterTestBuildDir() { + String oldTestBuildDir = + System.getProperty(TEST_DIRECTORY_KEY, "build/test/data"); + String randomStr = UUID.randomUUID().toString(); + String dirStr = oldTestBuildDir + "." + randomStr; + File dir = new File(dirStr).getAbsoluteFile(); + // Have it cleaned up on exit + dir.deleteOnExit(); + return dir; + } + + /** + * @throws IOException If cluster already running. + */ + void isRunningCluster() throws IOException { + if (this.clusterTestBuildDir == null) return; + throw new IOException("Cluster already running at " + + this.clusterTestBuildDir); + } + + /** * @param subdirName * @return Path to a subdirectory named subdirName under * {@link #getTestDir()}. @@ -114,16 +145,35 @@ startMiniCluster(1); } + /** + * Call this if you only want a zk cluster. + * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster. + * @throws Exception + * @see #shutdownMiniZKCluster() + */ public void startMiniZKCluster() throws Exception { - // Note that this is done before we create the MiniHBaseCluster because we - // need to edit the config to add the ZooKeeper servers. + isRunningCluster(); + this.clusterTestBuildDir = setupClusterTestBuildDir(); + startMiniZKCluster(this.clusterTestBuildDir); + + } + + private void startMiniZKCluster(final File dir) throws Exception { this.zkCluster = new MiniZooKeeperCluster(); - int clientPort = this.zkCluster.startup(this.clusterTestBuildDir); + int clientPort = this.zkCluster.startup(dir); this.conf.set("hbase.zookeeper.property.clientPort", Integer.toString(clientPort)); } /** + * @throws IOException + * @see #startMiniZKCluster() + */ + public void shutdownMiniZKCluster() throws IOException { + if (this.zkCluster != null) this.zkCluster.shutdown(); + } + + /** * Start up a minicluster of hbase, optinally dfs, and zookeeper. * Modifies Configuration. Homes the cluster data directory under a random * subdirectory in a directory under System property test.build.data. @@ -138,27 +188,13 @@ throws Exception { LOG.info("Starting up minicluster"); // If we already put up a cluster, fail. - if (this.clusterTestBuildDir != null) { - throw new IOException("Cluster already running at " + - this.clusterTestBuildDir); - } - // Now, home our cluster in a dir under build/test. Give it a random name - // so can have many concurrent clusters running if we need to. Need to - // amend the test.build.data System property. Its what minidfscluster bases - // it data dir on. Moding a System property is not the way to do concurrent - // instances -- another instance could grab the temporary - // value unintentionally -- but not anything can do about it at moment; its - // how the minidfscluster works. - String oldTestBuildDir = + isRunningCluster(); + String oldBuildTestDir = System.getProperty(TEST_DIRECTORY_KEY, "build/test/data"); - String randomStr = UUID.randomUUID().toString(); - String clusterTestBuildDirStr = oldTestBuildDir + "." + randomStr; - this.clusterTestBuildDir = - new File(clusterTestBuildDirStr).getAbsoluteFile(); - // Have it cleaned up on exit - this.clusterTestBuildDir.deleteOnExit(); + this.clusterTestBuildDir = setupClusterTestBuildDir(); + // Set our random dir while minidfscluster is being constructed. - System.setProperty(TEST_DIRECTORY_KEY, clusterTestBuildDirStr); + System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath()); // Bring up mini dfs cluster. This spews a bunch of warnings about missing // scheme. TODO: fix. // Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. @@ -167,7 +203,8 @@ // Restore System property. minidfscluster accesses content of // the TEST_DIRECTORY_KEY to make bad blocks, a feature we are not using, // but otherwise, just in constructor. - System.setProperty(TEST_DIRECTORY_KEY, oldTestBuildDir); + System.setProperty(TEST_DIRECTORY_KEY, oldBuildTestDir); + // Mangle conf so fs parameter points to minidfs we just started up FileSystem fs = this.dfsCluster.getFileSystem(); this.conf.set("fs.defaultFS", fs.getUri().toString()); @@ -175,7 +212,7 @@ // It could be created before the cluster if(this.zkCluster == null) { - startMiniZKCluster(); + startMiniZKCluster(this.clusterTestBuildDir); } // Now do the mini hbase cluster. Set the hbase.rootdir in config. @@ -193,7 +230,7 @@ /** * @throws IOException - * @see {@link #startMiniCluster(boolean, int)} + * @see {@link #startMiniCluster(int)} */ public void shutdownMiniCluster() throws IOException { LOG.info("Shutting down minicluster"); @@ -202,7 +239,7 @@ // Wait till hbase is down before going on to shutdown zk. this.hbaseCluster.join(); } - if (this.zkCluster != null) this.zkCluster.shutdown(); + shutdownMiniZKCluster(); if (this.dfsCluster != null) { // The below throws an exception per dn, AsynchronousCloseException. this.dfsCluster.shutdown(); Index: src/test/org/apache/hadoop/hbase/master/NoNetworkMasterRegion.java =================================================================== --- src/test/org/apache/hadoop/hbase/master/NoNetworkMasterRegion.java (revision 0) +++ src/test/org/apache/hadoop/hbase/master/NoNetworkMasterRegion.java (revision 0) @@ -0,0 +1,41 @@ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HMsg; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; +import org.apache.hadoop.io.MapWritable; + +public class NoNetworkMasterRegion implements HMasterRegionInterface { + private HMaster master; + + public NoNetworkMasterRegion(final HBaseConfiguration c, final HServerAddress a) { + // Nothing to do w/ params. + } + + public void setMaster(final HMaster m) { + this.master = m; + } + + @Override + public HMsg[] regionServerReport(HServerInfo info, HMsg[] msgs, + HRegionInfo[] mostLoadedRegions) + throws IOException { + return this.master.regionServerReport(info, msgs, mostLoadedRegions); + } + + @Override + public MapWritable regionServerStartup(HServerInfo info) throws IOException { + return this.master.regionServerStartup(info); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return this.master.getProtocolVersion(protocol, clientVersion); + } +} \ No newline at end of file Index: src/test/org/apache/hadoop/hbase/master/NoNetworkConnection.java =================================================================== --- src/test/org/apache/hadoop/hbase/master/NoNetworkConnection.java (revision 0) +++ src/test/org/apache/hadoop/hbase/master/NoNetworkConnection.java (revision 0) @@ -0,0 +1,182 @@ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.client.ServerConnection; +import org.apache.hadoop.hbase.ipc.HMasterInterface; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; + +/** + * An implementation that does not use the network -- it does not make RPC calls. + * Needs to implement ServerConnection too if used by Master. + */ +public class NoNetworkConnection implements ServerConnection, HConnection { + private final Map regionservers = + new HashMap(); + + public NoNetworkConnection(final HBaseConfiguration c) { + // Nothing to do. + } + + @Override + public void clearRegionCache() { + // TODO Auto-generated method stub + } + + @Override + public HRegionInterface getHRegionConnection(HServerAddress regionServer) + throws IOException { + return getHRegionConnection(regionServer, false); + } + + public void add(final HRegionServer rs) { + this.regionservers.put(rs.getAddress(), rs); + } + + @Override + public HRegionInterface getHRegionConnection(HServerAddress regionServer, + boolean getMaster) + throws IOException { + return this.regionservers.get(regionServer); + } + + @Override + public HTableDescriptor getHTableDescriptor(byte[] tableName) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public HMasterInterface getMaster() throws MasterNotRunningException { + // TODO Auto-generated method stub + return null; + } + + @Override + public HRegionLocation getRegionLocation(byte[] tableName, byte[] row, + boolean reload) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public T getRegionServerForWithoutRetries(ServerCallable callable) + throws IOException, RuntimeException { + // TODO Auto-generated method stub + return null; + } + + @Override + public T getRegionServerWithRetries(ServerCallable callable) + throws IOException, RuntimeException { + // TODO Auto-generated method stub + return null; + } + + @Override + public ZooKeeperWrapper getZooKeeperWrapper() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean isMasterRunning() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isTableAvailable(byte[] tableName) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isTableDisabled(byte[] tableName) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isTableEnabled(byte[] tableName) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public HTableDescriptor[] listTables() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public HRegionLocation locateRegion(byte[] tableName, byte[] row) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public int processBatchOfDeletes(List list, byte[] tableName) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void processBatchOfPuts(List list, byte[] tableName, + ExecutorService pool) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public int processBatchOfRows(ArrayList list, byte[] tableName) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public HRegionLocation relocateRegion(byte[] tableName, byte[] row) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean tableExists(byte[] tableName) throws MasterNotRunningException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void setRootRegionLocation(HRegionLocation rootRegion) { + // TODO Auto-generated method stub + + } + + @Override + public void unsetRootRegionLocation() { + // TODO Auto-generated method stub + + } +} \ No newline at end of file Index: src/test/org/apache/hadoop/hbase/master/TestMaster.java =================================================================== --- src/test/org/apache/hadoop/hbase/master/TestMaster.java (revision 0) +++ src/test/org/apache/hadoop/hbase/master/TestMaster.java (revision 0) @@ -0,0 +1,182 @@ +package org.apache.hadoop.hbase.master; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.ServerSocket; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HMsg; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; +import org.apache.hadoop.hbase.ipc.ServerFactory; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mortbay.log.Log; + +/** + * Test HMaster. + * TODO: How to do random ports. + */ +public class TestMaster { + private HMaster master; + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private Map regionservers = + new HashMap(); + private static final HRegionInfo [] EMPTY_HRI_ARRAY = new HRegionInfo [] {}; + private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {}; + + + @BeforeClass public static void beforeAllTests() throws Exception { + TEST_UTIL.startMiniZKCluster(); + } + + @AfterClass public static void afterAllTests() throws IOException { + TEST_UTIL.shutdownMiniZKCluster(); + } + + + /** + * Starts up a master. + * Fakes it out by sending in startup messages from two *servers*. The two + * fictional regionservers then fake the master into thinking that they have + * deployed .META. and -ROOT- regions. + * @throws IOException + */ + @Before + public void setup() throws IOException { + // Copy config. before I start messing with it + HBaseConfiguration c = new HBaseConfiguration(TEST_UTIL.getConfiguration()); + // Set an alternate to the RPC Server class, one that does not use the + // network. + c.set(ServerFactory.SERVER_CLASS_KEY, + "org.apache.hadoop.hbase.master.NoNetworkServer"); + // Set an alternate connection class, one that does not use network. + c.set(HConnectionManager.CONNECTION_CLASS_KEY, + "org.apache.hadoop.hbase.master.NoNetworkConnection"); + // Set the hbase.rootdir into config. + FileSystem fs = FileSystem.get(c); + Path hbaseRootdir = fs.makeQualified(fs.getHomeDirectory()); + c.set(HConstants.HBASE_DIR, hbaseRootdir.toString()); + c.setInt(HConstants.MASTER_PORT, 0); + + + // Start up master. + this.master = new HMaster(c); + this.master.start(); + // I need to add RegionServers to this connection so it can hook them. + NoNetworkConnection connection = (NoNetworkConnection)this.master.connection; + + // Bring up regionservers only have them also not use the network. + c.set(HRegionServer.MASTER_CLASS_KEY, + "org.apache.hadoop.hbase.master.NoNetworkMasterRegion"); + + addRegionServer(c, connection); + addRegionServer(c, connection); + // -ROOT- and .META. should be assigned at this stage. + } + + private HRegionServer addRegionServer(final HBaseConfiguration c, + final NoNetworkConnection connection) + throws IOException { + c.setInt(HRegionServer.REGIONSERVER_PORT, getFreePort()); + HRegionServer hrs = new TestHRegionServer(c, this.master); + connection.add(hrs); + HRegionServer.startRegionServer(hrs); + this.regionservers.put(hrs.getAddress(), hrs); + return hrs; + } + + static class TestHRegionServer extends HRegionServer { + private final HMaster master; + TestHRegionServer(final HBaseConfiguration c, final HMaster m) + throws IOException { + super(c); + this.master = m; + } + + @Override + protected HMasterRegionInterface setupMasterInterface() { + HMasterRegionInterface i = super.setupMasterInterface(); + ((NoNetworkMasterRegion)i).setMaster(this.master); + return i; + } + } + + @After + public void teardown() { + for (Map.Entry e: this.regionservers.entrySet()) { + e.getValue().stop(); + Log.info("Stopping regionserver " + e.getKey()); + } + if (this.master != null) this.master.shutdown(); + } + + /** + * @see HBASE-2428 + */ + @Test public void testRegionCloseWhenNoMeta() throws Exception { + // Push some random regions on to the master to assign. + HTableDescriptor htd = new HTableDescriptor("test_table"); + for(byte[] family: new byte [][] {Bytes.toBytes("one"), Bytes.toBytes("two"), + Bytes.toBytes("three")}) { + htd.addFamily(new HColumnDescriptor(family)); + } + // Add three regions to assign. + HRegionInfo hri = new HRegionInfo(htd, HConstants.EMPTY_START_ROW, + Bytes.toBytes("01")); + this.master.setUnassigned(hri); + hri = new HRegionInfo(htd, Bytes.toBytes("01"), Bytes.toBytes("02")); + this.master.setUnassigned(hri); + hri = new HRegionInfo(htd, Bytes.toBytes("02"), HConstants.EMPTY_END_ROW); + this.master.setUnassigned(hri); + } + + /** + * Checks to see if a specific port is available. + * Modified from apache mina available method. + * @return some random free port. + */ + public static int getFreePort() throws IOException { + ServerSocket ss = null; + DatagramSocket ds = null; + try { + ss = new ServerSocket(0); + ss.setReuseAddress(true); + ds = new DatagramSocket(ss.getLocalPort()); + ds.setReuseAddress(true); + return ss.getLocalPort(); + } finally { + if (ds != null) { + ds.close(); + } + if (ss != null) { + try { + ss.close(); + } catch (IOException e) { + /* should not be thrown */ + } + } + } + } +} Index: src/test/org/apache/hadoop/hbase/master/NoNetworkServer.java =================================================================== --- src/test/org/apache/hadoop/hbase/master/NoNetworkServer.java (revision 0) +++ src/test/org/apache/hadoop/hbase/master/NoNetworkServer.java (revision 0) @@ -0,0 +1,76 @@ +package org.apache.hadoop.hbase.master; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; +import org.apache.hadoop.hbase.ipc.ServerInterface; +import org.apache.hadoop.io.Writable; + +/** + * An implementation that does not use the network -- does not do RPC calls. + */ +public class NoNetworkServer implements ServerInterface { + private final HBaseConfiguration conf; + private final InetSocketAddress address; + + public NoNetworkServer(final HBaseConfiguration c, final HServerAddress a) { + this.conf = c; + this.address = new InetSocketAddress(a.getHostname(), a.getPort()); + } + + @Override + public Writable call(Writable param, long receiveTime) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getCallQueueLen() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public InetSocketAddress getListenerAddress() { + return this.address; + } + + @Override + public int getNumOpenConnections() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void join() throws InterruptedException { + // TODO Auto-generated method stub + + } + + @Override + public void setErrorHandler(HBaseRPCErrorHandler handler) { + // TODO Auto-generated method stub + + } + + @Override + public void setSocketSendBufSize(int size) { + // TODO Auto-generated method stub + + } + + @Override + public void start() { + // TODO Auto-generated method stub + + } + + @Override + public void stop() { + // TODO Auto-generated method stub + + } +} Index: src/test/org/apache/hadoop/hbase/TestClusterStateChanges.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestClusterStateChanges.java (revision 0) +++ src/test/org/apache/hadoop/hbase/TestClusterStateChanges.java (revision 0) @@ -0,0 +1,42 @@ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; +import org.apache.hadoop.io.MapWritable; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.junit.*; + +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.ServerSocket; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + +/** + * Test state transitions on cluster. + */ +public class TestClusterStateChanges { + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final HRegionInfo [] EMPTY_HRI_ARRAY = new HRegionInfo [] {}; + private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {}; + + + @BeforeClass public static void beforeAllTests() throws Exception { + TEST_UTIL.startMiniCluster(3); + } + + @AfterClass public static void afterAllTests() throws IOException { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @see HBASE-2428 + */ + @Test public void testRegionCloseWhenNoMeta() throws Exception { + } +} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 937144) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -89,9 +89,10 @@ import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; -import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.ipc.ServerFactory; +import org.apache.hadoop.hbase.ipc.ServerInterface; import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -120,6 +121,8 @@ private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED); private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {}; + public static final String MASTER_CLASS_KEY = "hbase.master.class"; + // Set when a report to the master comes back with a message asking us to // shutdown. Also set by call to stop when debugging or running unit tests // of HRegionServer in isolation. We use AtomicBoolean rather than @@ -171,7 +174,7 @@ // Server to handle client requests. Default access so can be accessed by // unit tests. - HBaseServer server; + ServerInterface server; // Leases private Leases leases; @@ -293,9 +296,7 @@ this.shutdownHDFS.set(true); // Server to handle client requests - this.server = HBaseRPC.getServer(this, address.getBindAddress(), - address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), - false, conf); + this.server = ServerFactory.getServer(this, this.conf, this.address); this.server.setErrorHandler(this); // Address is giving a default IP for the moment. Will be changed after // calling the master. @@ -375,7 +376,7 @@ } else if (type == EventType.NodeDeleted) { watchMasterAddress(); } else if (type == EventType.NodeCreated) { - getMaster(); + this.hbaseMaster = setupMasterInterface(); // ZooKeeper watches are one time only, so we need to re-register our watch. watchMasterAddress(); @@ -764,7 +765,13 @@ // Master may have sent us a new address with the other configs. // Update our address in this case. See HBASE-719 String hra = conf.get("hbase.regionserver.address"); - if (address != null) { + // TODO: The below used to be this.address != null. Was broken by what + // looks like a mistake in: + // + // HBASE-1215 migration; metautils scan of meta region was broken; wouldn't see first row + // ------------------------------------------------------------------------ + // r796326 | stack | 2009-07-21 07:40:34 -0700 (Tue, 21 Jul 2009) | 38 lines + if (hra != null) { HServerAddress hsa = new HServerAddress (hra, this.serverInfo.getServerAddress().getPort()); LOG.info("Master passed us address to use. Was=" + @@ -1276,11 +1283,11 @@ Threads.shutdown(this.hlogRoller); } - private boolean getMaster() { + protected HMasterRegionInterface setupMasterInterface() { HServerAddress masterAddress = null; while (masterAddress == null) { if (stopRequested.get()) { - return false; + return null; } try { masterAddress = zooKeeperWrapper.readMasterAddressOrThrow(); @@ -1297,25 +1304,60 @@ try { // Do initial RPC setup. The final argument indicates that the RPC // should retry indefinitely. - master = (HMasterRegionInterface)HBaseRPC.waitForProxy( - HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID, - masterAddress.getInetSocketAddress(), - this.conf, -1, this.rpcTimeout); + master = getMaster(this.conf, masterAddress, this.rpcTimeout); } catch (IOException e) { LOG.warn("Unable to connect to master. Retrying. Error was:", e); sleeper.sleep(); } } - this.hbaseMaster = master; - return true; + return master; } /* + * By default returns an instance of RPC proxy on {@link HMasterRegionInterface}. + * To customize, pass the name of an implementation of + * {@link HMasterRegionInterface} that has a constructor that takes + * {@link HBaseConfiguration} and {@link HServerAddress}. + * @param c + * @param a + * @param rpcTimeout + * @return Instance of {@link HMasterRegionInterface}. + * @throws IOException + */ + @SuppressWarnings("unchecked") + private HMasterRegionInterface getMaster(final HBaseConfiguration c, + final HServerAddress a, final long rpcTimeout) + throws IOException { + String masterClass = c.get(MASTER_CLASS_KEY, null); + // If an alternate class supplied, make an instance. Presume that it + // has a constructor that takes the Configuration and an HServerAddress. + if (masterClass != null) { + try { + Class cls = Class.forName(masterClass); + Constructor constructor = + cls.getConstructor(new Class [] {HBaseConfiguration.class, HServerAddress.class}); + return (HMasterRegionInterface)constructor.newInstance(new Object [] {c, a}); + } catch (Exception e) { + if (e instanceof IOException) throw (IOException)e; + else { + IOException ioe = new IOException(); + ioe.initCause(e); + } + } + } + return (HMasterRegionInterface)HBaseRPC.waitForProxy( + HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID, + a.getInetSocketAddress(), c, -1, rpcTimeout); + } + + /* * Let the master know we're here * Run initialization using parameters passed us by the master. */ private MapWritable reportForDuty() { - while (!stopRequested.get() && !getMaster()) { + while (!stopRequested.get()) { + this.hbaseMaster = setupMasterInterface(); + if (this.hbaseMaster != null) break; sleeper.sleep(); LOG.warn("Unable to get master for initialization"); } @@ -2399,10 +2441,90 @@ return fs; } + /** + * @return The address of this server. + */ + public HServerAddress getAddress() { return this.address; } + + /** {@inheritDoc} */ + public long incrementColumnValue(byte [] regionName, byte [] row, + byte [] family, byte [] qualifier, long amount, boolean writeToWAL) + throws IOException { + checkOpen(); + + if (regionName == null) { + throw new IOException("Invalid arguments to incrementColumnValue " + + "regionName is null"); + } + requestCount.incrementAndGet(); + try { + HRegion region = getRegion(regionName); + long retval = region.incrementColumnValue(row, family, qualifier, amount, + writeToWAL); + + return retval; + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + /** {@inheritDoc} */ + public HRegionInfo[] getRegionsAssignment() throws IOException { + HRegionInfo[] regions = new HRegionInfo[onlineRegions.size()]; + Iterator ite = onlineRegions.values().iterator(); + for(int i = 0; ite.hasNext(); i++) { + regions[i] = ite.next().getRegionInfo(); + } + return regions; + } + + /** {@inheritDoc} */ + public HServerInfo getHServerInfo() throws IOException { + return serverInfo; + } + + @Override + public MultiPutResponse multiPut(MultiPut puts) throws IOException { + MultiPutResponse resp = new MultiPutResponse(); + + // do each region as it's own. + for( Map.Entry> e: puts.puts.entrySet()) { + int result = put(e.getKey(), e.getValue().toArray(new Put[]{})); + resp.addResult(e.getKey(), result); + + e.getValue().clear(); // clear some RAM + } + + return resp; + } + // // Main program and support routines // + + /** + * @param hrs + * @return Thread the RegionServer is running in correctly named. + */ + public static Thread startRegionServer(final HRegionServer hrs) { + return startRegionServer(hrs, + "regionserver" + hrs.server.getListenerAddress()); + } + /** + * @param hrs + * @param name + * @return Thread the RegionServer is running in correctly named. + */ + public static Thread startRegionServer(final HRegionServer hrs, + final String name) { + Thread t = new Thread(hrs); + t.setName(name); + t.start(); + return t; + } + private static void printUsageAndExit() { printUsageAndExit(null); } @@ -2414,7 +2536,7 @@ System.err.println("Usage: java org.apache.hbase.HRegionServer start|stop"); System.exit(0); } - + /** * Do class main. * @param args @@ -2444,10 +2566,7 @@ } Constructor c = regionServerClass.getConstructor(HBaseConfiguration.class); - HRegionServer hrs = c.newInstance(conf); - Thread t = new Thread(hrs); - t.setName("regionserver" + hrs.server.getListenerAddress()); - t.start(); + startRegionServer(c.newInstance(conf)); } } catch (Throwable t) { LOG.error( "Can not start region server because "+ @@ -2466,45 +2585,7 @@ printUsageAndExit(); } } - - /** {@inheritDoc} */ - public long incrementColumnValue(byte [] regionName, byte [] row, - byte [] family, byte [] qualifier, long amount, boolean writeToWAL) - throws IOException { - checkOpen(); - if (regionName == null) { - throw new IOException("Invalid arguments to incrementColumnValue " + - "regionName is null"); - } - requestCount.incrementAndGet(); - try { - HRegion region = getRegion(regionName); - long retval = region.incrementColumnValue(row, family, qualifier, amount, - writeToWAL); - - return retval; - } catch (IOException e) { - checkFileSystem(); - throw e; - } - } - - /** {@inheritDoc} */ - public HRegionInfo[] getRegionsAssignment() throws IOException { - HRegionInfo[] regions = new HRegionInfo[onlineRegions.size()]; - Iterator ite = onlineRegions.values().iterator(); - for(int i = 0; ite.hasNext(); i++) { - regions[i] = ite.next().getRegionInfo(); - } - return regions; - } - - /** {@inheritDoc} */ - public HServerInfo getHServerInfo() throws IOException { - return serverInfo; - } - /** * @param args */ @@ -2516,20 +2597,4 @@ HRegionServer.class); doMain(args, regionServerClass); } - - - @Override - public MultiPutResponse multiPut(MultiPut puts) throws IOException { - MultiPutResponse resp = new MultiPutResponse(); - - // do each region as it's own. - for( Map.Entry> e: puts.puts.entrySet()) { - int result = put(e.getKey(), e.getValue().toArray(new Put[]{})); - resp.addResult(e.getKey(), result); - - e.getValue().clear(); // clear some RAM - } - - return resp; - } } Index: src/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/HMaster.java (revision 937144) +++ src/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -19,8 +19,8 @@ */ package org.apache.hadoop.hbase.master; +import java.io.File; import java.io.IOException; -import java.io.File; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.lang.reflect.Constructor; @@ -33,8 +33,8 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -56,9 +56,9 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.LocalHBaseCluster; import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.MiniZooKeeperCluster; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.MiniZooKeeperCluster; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Result; @@ -66,12 +66,13 @@ import org.apache.hadoop.hbase.client.ServerConnection; import org.apache.hadoop.hbase.client.ServerConnectionManager; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.ipc.ServerFactory; +import org.apache.hadoop.hbase.ipc.ServerInterface; import org.apache.hadoop.hbase.master.metrics.MasterMetrics; import org.apache.hadoop.hbase.regionserver.HLog; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -80,8 +81,8 @@ import org.apache.hadoop.hbase.util.InfoServer; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Sleeper; +import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; @@ -134,7 +135,7 @@ volatile BlockingQueue toDoQueue = new PriorityBlockingQueue(); - private final HBaseServer server; + private final ServerInterface server; private final HServerAddress address; final ServerConnection connection; @@ -174,8 +175,8 @@ conf.get("hbase.master.dns.nameserver","default")); addressStr += ":" + conf.get(MASTER_PORT, Integer.toString(DEFAULT_MASTER_PORT)); - HServerAddress address = new HServerAddress(addressStr); - LOG.info("My address is " + address); + HServerAddress hsa = new HServerAddress(addressStr); + LOG.info("My address is " + hsa); this.conf = conf; this.rootdir = new Path(conf.get(HBASE_DIR)); @@ -220,9 +221,7 @@ this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 120 * 1000); this.leaseTimeout = conf.getInt("hbase.master.lease.period", 120 * 1000); - this.server = HBaseRPC.getServer(this, address.getBindAddress(), - address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), - false, conf); + this.server = ServerFactory.getServer(this, this.conf, hsa); // The rpc-server port can be ephemeral... ensure we have the correct info this.address = new HServerAddress(server.getListenerAddress()); @@ -376,6 +375,14 @@ } /** + * Used testing. + * @param hri Region to set unassigned. + */ + void setUnassigned(final HRegionInfo hri) { + this.regionManager.setUnassigned(hri, false); + } + + /** * @return Location of the -ROOT- region. */ public HServerAddress getRootRegionLocation() { Index: src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (revision 937144) +++ src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (working copy) @@ -380,8 +380,8 @@ * Out is not synchronized because only the first thread does this. */ private void writeHeader() throws IOException { - out.write(HBaseServer.HEADER.array()); - out.write(HBaseServer.CURRENT_VERSION); + out.write(ServerInterface.HEADER.array()); + out.write(ServerInterface.CURRENT_VERSION); //When there are more fields we can have ConnectionHeader Writable. DataOutputBuffer buf = new DataOutputBuffer(); ObjectWritable.writeObject(buf, remoteId.getTicket(), Index: src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (revision 937144) +++ src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (working copy) @@ -29,10 +29,6 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; import java.util.HashMap; import java.util.Map; Index: src/java/org/apache/hadoop/hbase/ipc/ServerInterface.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/ServerInterface.java (revision 0) +++ src/java/org/apache/hadoop/hbase/ipc/ServerInterface.java (revision 0) @@ -0,0 +1,81 @@ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import org.apache.hadoop.io.Writable; + +/** + * Server. + * TODO: Don't need this many of the {@link HBaseServer} methods. Cut them down. + */ +public interface ServerInterface { + /** + * The first four bytes of Hadoop RPC connections + */ + public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); + // 1 : Introduce ping and server does not throw away RPCs + // 3 : RPC was refactored in 0.19 + public static final byte CURRENT_VERSION = 3; + + /** + * Sets the socket buffer size used for responding to RPCs. + * + * @param size + */ + public abstract void setSocketSendBufSize(int size); + + /** Starts the service. Must be called before any calls will be handled. */ + public abstract void start(); + + /** Stops the service. No new calls will be handled after this is called. */ + public abstract void stop(); + + /** + * Wait for the server to be stopped. Does not wait for all subthreads to + * finish. See {@link #stop()}. + * + * @throws InterruptedException + */ + public abstract void join() throws InterruptedException; + + /** + * Return the socket (ip+port) on which the RPC server is listening to. + * + * @return the socket (ip+port) on which the RPC server is listening to. + */ + public abstract InetSocketAddress getListenerAddress(); + + /** + * Called for each call. + * + * @param param + * @param receiveTime + * @return Writable + * @throws IOException + */ + public abstract Writable call(Writable param, long receiveTime) + throws IOException; + + /** + * The number of open RPC conections + * + * @return the number of open rpc connections + */ + public abstract int getNumOpenConnections(); + + /** + * The number of rpc calls in the queue. + * + * @return The number of rpc calls in the queue. + */ + public abstract int getCallQueueLen(); + + /** + * Set the handler for calling out of RPC for error conditions. + * + * @param handler + * the handler implementation + */ + public abstract void setErrorHandler(HBaseRPCErrorHandler handler); +} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 937144) +++ src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -67,25 +67,15 @@ * * @see HBaseClient */ -public abstract class HBaseServer { +public abstract class HBaseServer implements ServerInterface { + public static final Log LOG = + LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer"); /** - * The first four bytes of Hadoop RPC connections - */ - public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); - - // 1 : Introduce ping and server does not throw away RPCs - // 3 : RPC was refactored in 0.19 - public static final byte CURRENT_VERSION = 3; - - /** * How many calls/handler are allowed in the queue. */ private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100; - public static final Log LOG = - LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer"); - protected static final ThreadLocal SERVER = new ThreadLocal(); @@ -95,7 +85,7 @@ * the server context. * @return HBaseServer */ - public static HBaseServer get() { + public static ServerInterface get() { return SERVER.get(); } @@ -1014,12 +1004,14 @@ connection.close(); } - /** Sets the socket buffer size used for responding to RPCs. - * @param size - */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#setSocketSendBufSize(int) + */ public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; } - /** Starts the service. Must be called before any calls will be handled. */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#start() + */ public synchronized void start() { responder.start(); listener.start(); @@ -1031,7 +1023,9 @@ } } - /** Stops the service. No new calls will be handled after this is called. */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#stop() + */ public synchronized void stop() { LOG.info("Stopping server on " + port); running = false; @@ -1051,54 +1045,45 @@ } } - /** Wait for the server to be stopped. - * Does not wait for all subthreads to finish. - * See {@link #stop()}. - * @throws InterruptedException - */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#join() + */ public synchronized void join() throws InterruptedException { while (running) { wait(); } } - /** - * Return the socket (ip+port) on which the RPC server is listening to. - * @return the socket (ip+port) on which the RPC server is listening to. - */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#getListenerAddress() + */ public synchronized InetSocketAddress getListenerAddress() { return listener.getAddress(); } - /** Called for each call. - * @param param - * @param receiveTime - * @return Writable - * @throws IOException - */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#call(org.apache.hadoop.io.Writable, long) + */ public abstract Writable call(Writable param, long receiveTime) throws IOException; - /** - * The number of open RPC conections - * @return the number of open rpc connections - */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#getNumOpenConnections() + */ public int getNumOpenConnections() { return numConnections; } - /** - * The number of rpc calls in the queue. - * @return The number of rpc calls in the queue. - */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#getCallQueueLen() + */ public int getCallQueueLen() { return callQueue.size(); } - /** - * Set the handler for calling out of RPC for error conditions. - * @param handler the handler implementation - */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#setErrorHandler(org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler) + */ public void setErrorHandler(HBaseRPCErrorHandler handler) { this.errorHandler = handler; } Index: src/java/org/apache/hadoop/hbase/ipc/ServerFactory.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/ServerFactory.java (revision 0) +++ src/java/org/apache/hadoop/hbase/ipc/ServerFactory.java (revision 0) @@ -0,0 +1,53 @@ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.lang.reflect.Constructor; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HServerAddress; + +/** + * A Server Factory. + * If hbase.server.class is set in the passed configuration, + * its presumed its an implementation of {@link ServerInterface} and that it + * has a constructor that takes an HBaseConfiguration and a HServerAdddress. + * This factory will instantiate an instance of this configured class. + * Otherwise it returns result of + * {@link HBaseRPC#getServer(Object, String, int, int, boolean, org.apache.hadoop.conf.Configuration)} + */ +public class ServerFactory { + public static final String SERVER_CLASS_KEY = "hbase.server.interface.class"; + + /** + * Return a {@link ServerInterface} implementation. + * @param instance + * @param c Configuration + * @param a Address + * @return An ServerInterface implementation. + * @throws IOException + */ + @SuppressWarnings("unchecked") + public static ServerInterface getServer(final Object instance, + final HBaseConfiguration c, final HServerAddress a) + throws IOException { + String serverClass = c.get("hbase.server.class", null); + // If an alternate class supplied, make an instance. Presume that it + // has a constructor that takes the Configuration and an HServerAddress. + if (serverClass != null) { + try { + Class cls = Class.forName(serverClass); + Constructor constructor = + cls.getConstructor(new Class [] {HBaseConfiguration.class, HServerAddress.class}); + return (ServerInterface)constructor.newInstance(new Object [] {c, a}); + } catch (Exception e) { + if (e instanceof IOException) throw (IOException)e; + else { + IOException ioe = new IOException(); + ioe.initCause(e); + } + } + } + return HBaseRPC.getServer(instance, a.getBindAddress(), a.getPort(), + c.getInt("hbase.regionserver.handler.count", 25), false, c); + } +} Index: src/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 937144) +++ src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.lang.reflect.Constructor; import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; import java.util.Collections; @@ -28,11 +29,11 @@ import java.util.List; import java.util.Map; import java.util.TreeSet; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -69,9 +70,12 @@ * * Used by {@link HTable} and {@link HBaseAdmin} */ +@SuppressWarnings("serial") public class HConnectionManager implements HConstants { private static final Delete [] DELETE_ARRAY_TYPE = new Delete[0]; private static final Put [] PUT_ARRAY_TYPE = new Put[0]; + public static final String CONNECTION_CLASS_KEY = + "hbase.connection.interface.class"; // Register a shutdown hook, one that cleans up RPC and closes zk sessions. static { @@ -114,7 +118,20 @@ * @param conf * @return HConnection object for the instance specified by the configuration */ + @SuppressWarnings("unchecked") public static HConnection getConnection(HBaseConfiguration conf) { + String className = conf.get(CONNECTION_CLASS_KEY, null); + if (className != null) { + try { + Class cls = Class.forName(className); + Constructor constructor = + cls.getConstructor(new Class [] {HBaseConfiguration.class}); + return (HConnection)constructor.newInstance(new Object [] {conf}); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + // Else do default. TableServers connection; synchronized (HBASE_INSTANCES) { connection = HBASE_INSTANCES.get(conf); @@ -125,7 +142,7 @@ } return connection; } - + /** * Delete connection information for the instance specified by configuration * @param conf @@ -942,10 +959,7 @@ server = this.servers.get(regionServer.toString()); if (server == null) { // Get a connection try { - server = (HRegionInterface)HBaseRPC.waitForProxy( - serverInterfaceClass, HBaseRPCProtocolVersion.versionID, - regionServer.getInetSocketAddress(), this.conf, - this.maxRPCAttempts, this.rpcTimeout); + server = createHRegionServer(regionServer); } catch (RemoteException e) { throw RemoteExceptionHandler.decodeRemoteException(e); } @@ -954,7 +968,20 @@ } return server; } - + + /* + * @param regionServer + * @return Implementation of HRegionInterface. + * @throws IOException + */ + private HRegionInterface createHRegionServer(final HServerAddress regionServer) + throws IOException { + return (HRegionInterface) HBaseRPC.waitForProxy( + this.serverInterfaceClass, HBaseRPCProtocolVersion.versionID, + regionServer.getInetSocketAddress(), this.conf, this.maxRPCAttempts, + this.rpcTimeout); + } + public HRegionInterface getHRegionConnection( HServerAddress regionServer) throws IOException {