Index: src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java
===================================================================
--- src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 937144)
+++ src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy)
@@ -68,6 +68,7 @@
private MiniDFSCluster dfsCluster = null;
private MiniHBaseCluster hbaseCluster = null;
private MiniMRCluster mrCluster = null;
+ // If non-null, then already a cluster running.
private File clusterTestBuildDir = null;
private HBaseAdmin hbaseAdmin = null;
@@ -98,6 +99,36 @@
}
/**
+ * Home our cluster in a dir under build/test. Give it a random name
+ * so can have many concurrent clusters running if we need to. Need to
+ * amend the test.build.data System property. Its what minidfscluster bases
+ * it data dir on. Moding a System property is not the way to do concurrent
+ * instances -- another instance could grab the temporary
+ * value unintentionally -- but not anything can do about it at moment; its
+ * how the minidfscluster works.
+ * @return The calculated cluster test build directory.
+ */
+ File setupClusterTestBuildDir() {
+ String oldTestBuildDir =
+ System.getProperty(TEST_DIRECTORY_KEY, "build/test/data");
+ String randomStr = UUID.randomUUID().toString();
+ String dirStr = oldTestBuildDir + "." + randomStr;
+ File dir = new File(dirStr).getAbsoluteFile();
+ // Have it cleaned up on exit
+ dir.deleteOnExit();
+ return dir;
+ }
+
+ /**
+ * @throws IOException If cluster already running.
+ */
+ void isRunningCluster() throws IOException {
+ if (this.clusterTestBuildDir == null) return;
+ throw new IOException("Cluster already running at " +
+ this.clusterTestBuildDir);
+ }
+
+ /**
* @param subdirName
* @return Path to a subdirectory named subdirName under
* {@link #getTestDir()}.
@@ -114,16 +145,35 @@
startMiniCluster(1);
}
+ /**
+ * Call this if you only want a zk cluster.
+ * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
+ * @throws Exception
+ * @see #shutdownMiniZKCluster()
+ */
public void startMiniZKCluster() throws Exception {
- // Note that this is done before we create the MiniHBaseCluster because we
- // need to edit the config to add the ZooKeeper servers.
+ isRunningCluster();
+ this.clusterTestBuildDir = setupClusterTestBuildDir();
+ startMiniZKCluster(this.clusterTestBuildDir);
+
+ }
+
+ private void startMiniZKCluster(final File dir) throws Exception {
this.zkCluster = new MiniZooKeeperCluster();
- int clientPort = this.zkCluster.startup(this.clusterTestBuildDir);
+ int clientPort = this.zkCluster.startup(dir);
this.conf.set("hbase.zookeeper.property.clientPort",
Integer.toString(clientPort));
}
/**
+ * @throws IOException
+ * @see #startMiniZKCluster()
+ */
+ public void shutdownMiniZKCluster() throws IOException {
+ if (this.zkCluster != null) this.zkCluster.shutdown();
+ }
+
+ /**
* Start up a minicluster of hbase, optinally dfs, and zookeeper.
* Modifies Configuration. Homes the cluster data directory under a random
* subdirectory in a directory under System property test.build.data.
@@ -138,27 +188,13 @@
throws Exception {
LOG.info("Starting up minicluster");
// If we already put up a cluster, fail.
- if (this.clusterTestBuildDir != null) {
- throw new IOException("Cluster already running at " +
- this.clusterTestBuildDir);
- }
- // Now, home our cluster in a dir under build/test. Give it a random name
- // so can have many concurrent clusters running if we need to. Need to
- // amend the test.build.data System property. Its what minidfscluster bases
- // it data dir on. Moding a System property is not the way to do concurrent
- // instances -- another instance could grab the temporary
- // value unintentionally -- but not anything can do about it at moment; its
- // how the minidfscluster works.
- String oldTestBuildDir =
+ isRunningCluster();
+ String oldBuildTestDir =
System.getProperty(TEST_DIRECTORY_KEY, "build/test/data");
- String randomStr = UUID.randomUUID().toString();
- String clusterTestBuildDirStr = oldTestBuildDir + "." + randomStr;
- this.clusterTestBuildDir =
- new File(clusterTestBuildDirStr).getAbsoluteFile();
- // Have it cleaned up on exit
- this.clusterTestBuildDir.deleteOnExit();
+ this.clusterTestBuildDir = setupClusterTestBuildDir();
+
// Set our random dir while minidfscluster is being constructed.
- System.setProperty(TEST_DIRECTORY_KEY, clusterTestBuildDirStr);
+ System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath());
// Bring up mini dfs cluster. This spews a bunch of warnings about missing
// scheme. TODO: fix.
// Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
@@ -167,7 +203,8 @@
// Restore System property. minidfscluster accesses content of
// the TEST_DIRECTORY_KEY to make bad blocks, a feature we are not using,
// but otherwise, just in constructor.
- System.setProperty(TEST_DIRECTORY_KEY, oldTestBuildDir);
+ System.setProperty(TEST_DIRECTORY_KEY, oldBuildTestDir);
+
// Mangle conf so fs parameter points to minidfs we just started up
FileSystem fs = this.dfsCluster.getFileSystem();
this.conf.set("fs.defaultFS", fs.getUri().toString());
@@ -175,7 +212,7 @@
// It could be created before the cluster
if(this.zkCluster == null) {
- startMiniZKCluster();
+ startMiniZKCluster(this.clusterTestBuildDir);
}
// Now do the mini hbase cluster. Set the hbase.rootdir in config.
@@ -193,7 +230,7 @@
/**
* @throws IOException
- * @see {@link #startMiniCluster(boolean, int)}
+ * @see {@link #startMiniCluster(int)}
*/
public void shutdownMiniCluster() throws IOException {
LOG.info("Shutting down minicluster");
@@ -202,7 +239,7 @@
// Wait till hbase is down before going on to shutdown zk.
this.hbaseCluster.join();
}
- if (this.zkCluster != null) this.zkCluster.shutdown();
+ shutdownMiniZKCluster();
if (this.dfsCluster != null) {
// The below throws an exception per dn, AsynchronousCloseException.
this.dfsCluster.shutdown();
Index: src/test/org/apache/hadoop/hbase/master/NoNetworkMasterRegion.java
===================================================================
--- src/test/org/apache/hadoop/hbase/master/NoNetworkMasterRegion.java (revision 0)
+++ src/test/org/apache/hadoop/hbase/master/NoNetworkMasterRegion.java (revision 0)
@@ -0,0 +1,41 @@
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
+import org.apache.hadoop.io.MapWritable;
+
+public class NoNetworkMasterRegion implements HMasterRegionInterface {
+ private HMaster master;
+
+ public NoNetworkMasterRegion(final HBaseConfiguration c, final HServerAddress a) {
+ // Nothing to do w/ params.
+ }
+
+ public void setMaster(final HMaster m) {
+ this.master = m;
+ }
+
+ @Override
+ public HMsg[] regionServerReport(HServerInfo info, HMsg[] msgs,
+ HRegionInfo[] mostLoadedRegions)
+ throws IOException {
+ return this.master.regionServerReport(info, msgs, mostLoadedRegions);
+ }
+
+ @Override
+ public MapWritable regionServerStartup(HServerInfo info) throws IOException {
+ return this.master.regionServerStartup(info);
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return this.master.getProtocolVersion(protocol, clientVersion);
+ }
+}
\ No newline at end of file
Index: src/test/org/apache/hadoop/hbase/master/NoNetworkConnection.java
===================================================================
--- src/test/org/apache/hadoop/hbase/master/NoNetworkConnection.java (revision 0)
+++ src/test/org/apache/hadoop/hbase/master/NoNetworkConnection.java (revision 0)
@@ -0,0 +1,182 @@
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.client.ServerConnection;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+
+/**
+ * An implementation that does not use the network -- it does not make RPC calls.
+ * Needs to implement ServerConnection too if used by Master.
+ */
+public class NoNetworkConnection implements ServerConnection, HConnection {
+ private final Map regionservers =
+ new HashMap();
+
+ public NoNetworkConnection(final HBaseConfiguration c) {
+ // Nothing to do.
+ }
+
+ @Override
+ public void clearRegionCache() {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public HRegionInterface getHRegionConnection(HServerAddress regionServer)
+ throws IOException {
+ return getHRegionConnection(regionServer, false);
+ }
+
+ public void add(final HRegionServer rs) {
+ this.regionservers.put(rs.getAddress(), rs);
+ }
+
+ @Override
+ public HRegionInterface getHRegionConnection(HServerAddress regionServer,
+ boolean getMaster)
+ throws IOException {
+ return this.regionservers.get(regionServer);
+ }
+
+ @Override
+ public HTableDescriptor getHTableDescriptor(byte[] tableName)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public HMasterInterface getMaster() throws MasterNotRunningException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public HRegionLocation getRegionLocation(byte[] tableName, byte[] row,
+ boolean reload)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public T getRegionServerForWithoutRetries(ServerCallable callable)
+ throws IOException, RuntimeException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public T getRegionServerWithRetries(ServerCallable callable)
+ throws IOException, RuntimeException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ZooKeeperWrapper getZooKeeperWrapper() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean isMasterRunning() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isTableAvailable(byte[] tableName) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isTableDisabled(byte[] tableName) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isTableEnabled(byte[] tableName) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public HTableDescriptor[] listTables() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public HRegionLocation locateRegion(byte[] tableName, byte[] row)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int processBatchOfDeletes(List list, byte[] tableName)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void processBatchOfPuts(List list, byte[] tableName,
+ ExecutorService pool) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public int processBatchOfRows(ArrayList list, byte[] tableName)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public HRegionLocation relocateRegion(byte[] tableName, byte[] row)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean tableExists(byte[] tableName) throws MasterNotRunningException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void setRootRegionLocation(HRegionLocation rootRegion) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void unsetRootRegionLocation() {
+ // TODO Auto-generated method stub
+
+ }
+}
\ No newline at end of file
Index: src/test/org/apache/hadoop/hbase/master/TestMaster.java
===================================================================
--- src/test/org/apache/hadoop/hbase/master/TestMaster.java (revision 0)
+++ src/test/org/apache/hadoop/hbase/master/TestMaster.java (revision 0)
@@ -0,0 +1,182 @@
+package org.apache.hadoop.hbase.master;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.ServerSocket;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
+import org.apache.hadoop.hbase.ipc.ServerFactory;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+/**
+ * Test HMaster.
+ * TODO: How to do random ports.
+ */
+public class TestMaster {
+ private HMaster master;
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private Map regionservers =
+ new HashMap();
+ private static final HRegionInfo [] EMPTY_HRI_ARRAY = new HRegionInfo [] {};
+ private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {};
+
+
+ @BeforeClass public static void beforeAllTests() throws Exception {
+ TEST_UTIL.startMiniZKCluster();
+ }
+
+ @AfterClass public static void afterAllTests() throws IOException {
+ TEST_UTIL.shutdownMiniZKCluster();
+ }
+
+
+ /**
+ * Starts up a master.
+ * Fakes it out by sending in startup messages from two *servers*. The two
+ * fictional regionservers then fake the master into thinking that they have
+ * deployed .META. and -ROOT- regions.
+ * @throws IOException
+ */
+ @Before
+ public void setup() throws IOException {
+ // Copy config. before I start messing with it
+ HBaseConfiguration c = new HBaseConfiguration(TEST_UTIL.getConfiguration());
+ // Set an alternate to the RPC Server class, one that does not use the
+ // network.
+ c.set(ServerFactory.SERVER_CLASS_KEY,
+ "org.apache.hadoop.hbase.master.NoNetworkServer");
+ // Set an alternate connection class, one that does not use network.
+ c.set(HConnectionManager.CONNECTION_CLASS_KEY,
+ "org.apache.hadoop.hbase.master.NoNetworkConnection");
+ // Set the hbase.rootdir into config.
+ FileSystem fs = FileSystem.get(c);
+ Path hbaseRootdir = fs.makeQualified(fs.getHomeDirectory());
+ c.set(HConstants.HBASE_DIR, hbaseRootdir.toString());
+ c.setInt(HConstants.MASTER_PORT, 0);
+
+
+ // Start up master.
+ this.master = new HMaster(c);
+ this.master.start();
+ // I need to add RegionServers to this connection so it can hook them.
+ NoNetworkConnection connection = (NoNetworkConnection)this.master.connection;
+
+ // Bring up regionservers only have them also not use the network.
+ c.set(HRegionServer.MASTER_CLASS_KEY,
+ "org.apache.hadoop.hbase.master.NoNetworkMasterRegion");
+
+ addRegionServer(c, connection);
+ addRegionServer(c, connection);
+ // -ROOT- and .META. should be assigned at this stage.
+ }
+
+ private HRegionServer addRegionServer(final HBaseConfiguration c,
+ final NoNetworkConnection connection)
+ throws IOException {
+ c.setInt(HRegionServer.REGIONSERVER_PORT, getFreePort());
+ HRegionServer hrs = new TestHRegionServer(c, this.master);
+ connection.add(hrs);
+ HRegionServer.startRegionServer(hrs);
+ this.regionservers.put(hrs.getAddress(), hrs);
+ return hrs;
+ }
+
+ static class TestHRegionServer extends HRegionServer {
+ private final HMaster master;
+ TestHRegionServer(final HBaseConfiguration c, final HMaster m)
+ throws IOException {
+ super(c);
+ this.master = m;
+ }
+
+ @Override
+ protected HMasterRegionInterface setupMasterInterface() {
+ HMasterRegionInterface i = super.setupMasterInterface();
+ ((NoNetworkMasterRegion)i).setMaster(this.master);
+ return i;
+ }
+ }
+
+ @After
+ public void teardown() {
+ for (Map.Entry e: this.regionservers.entrySet()) {
+ e.getValue().stop();
+ Log.info("Stopping regionserver " + e.getKey());
+ }
+ if (this.master != null) this.master.shutdown();
+ }
+
+ /**
+ * @see HBASE-2428
+ */
+ @Test public void testRegionCloseWhenNoMeta() throws Exception {
+ // Push some random regions on to the master to assign.
+ HTableDescriptor htd = new HTableDescriptor("test_table");
+ for(byte[] family: new byte [][] {Bytes.toBytes("one"), Bytes.toBytes("two"),
+ Bytes.toBytes("three")}) {
+ htd.addFamily(new HColumnDescriptor(family));
+ }
+ // Add three regions to assign.
+ HRegionInfo hri = new HRegionInfo(htd, HConstants.EMPTY_START_ROW,
+ Bytes.toBytes("01"));
+ this.master.setUnassigned(hri);
+ hri = new HRegionInfo(htd, Bytes.toBytes("01"), Bytes.toBytes("02"));
+ this.master.setUnassigned(hri);
+ hri = new HRegionInfo(htd, Bytes.toBytes("02"), HConstants.EMPTY_END_ROW);
+ this.master.setUnassigned(hri);
+ }
+
+ /**
+ * Checks to see if a specific port is available.
+ * Modified from apache mina available method.
+ * @return some random free port.
+ */
+ public static int getFreePort() throws IOException {
+ ServerSocket ss = null;
+ DatagramSocket ds = null;
+ try {
+ ss = new ServerSocket(0);
+ ss.setReuseAddress(true);
+ ds = new DatagramSocket(ss.getLocalPort());
+ ds.setReuseAddress(true);
+ return ss.getLocalPort();
+ } finally {
+ if (ds != null) {
+ ds.close();
+ }
+ if (ss != null) {
+ try {
+ ss.close();
+ } catch (IOException e) {
+ /* should not be thrown */
+ }
+ }
+ }
+ }
+}
Index: src/test/org/apache/hadoop/hbase/master/NoNetworkServer.java
===================================================================
--- src/test/org/apache/hadoop/hbase/master/NoNetworkServer.java (revision 0)
+++ src/test/org/apache/hadoop/hbase/master/NoNetworkServer.java (revision 0)
@@ -0,0 +1,76 @@
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
+import org.apache.hadoop.hbase.ipc.ServerInterface;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * An implementation that does not use the network -- does not do RPC calls.
+ */
+public class NoNetworkServer implements ServerInterface {
+ private final HBaseConfiguration conf;
+ private final InetSocketAddress address;
+
+ public NoNetworkServer(final HBaseConfiguration c, final HServerAddress a) {
+ this.conf = c;
+ this.address = new InetSocketAddress(a.getHostname(), a.getPort());
+ }
+
+ @Override
+ public Writable call(Writable param, long receiveTime) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int getCallQueueLen() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public InetSocketAddress getListenerAddress() {
+ return this.address;
+ }
+
+ @Override
+ public int getNumOpenConnections() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void join() throws InterruptedException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setErrorHandler(HBaseRPCErrorHandler handler) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setSocketSendBufSize(int size) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void start() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void stop() {
+ // TODO Auto-generated method stub
+
+ }
+}
Index: src/test/org/apache/hadoop/hbase/TestClusterStateChanges.java
===================================================================
--- src/test/org/apache/hadoop/hbase/TestClusterStateChanges.java (revision 0)
+++ src/test/org/apache/hadoop/hbase/TestClusterStateChanges.java (revision 0)
@@ -0,0 +1,42 @@
+package org.apache.hadoop.hbase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.junit.*;
+
+import java.io.IOException;
+import java.net.DatagramSocket;
+import java.net.ServerSocket;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+/**
+ * Test state transitions on cluster.
+ */
+public class TestClusterStateChanges {
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final HRegionInfo [] EMPTY_HRI_ARRAY = new HRegionInfo [] {};
+ private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {};
+
+
+ @BeforeClass public static void beforeAllTests() throws Exception {
+ TEST_UTIL.startMiniCluster(3);
+ }
+
+ @AfterClass public static void afterAllTests() throws IOException {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * @see HBASE-2428
+ */
+ @Test public void testRegionCloseWhenNoMeta() throws Exception {
+ }
+}
\ No newline at end of file
Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 937144)
+++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -89,9 +89,10 @@
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
-import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.ServerFactory;
+import org.apache.hadoop.hbase.ipc.ServerInterface;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -120,6 +121,8 @@
private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED);
private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {};
+ public static final String MASTER_CLASS_KEY = "hbase.master.class";
+
// Set when a report to the master comes back with a message asking us to
// shutdown. Also set by call to stop when debugging or running unit tests
// of HRegionServer in isolation. We use AtomicBoolean rather than
@@ -171,7 +174,7 @@
// Server to handle client requests. Default access so can be accessed by
// unit tests.
- HBaseServer server;
+ ServerInterface server;
// Leases
private Leases leases;
@@ -293,9 +296,7 @@
this.shutdownHDFS.set(true);
// Server to handle client requests
- this.server = HBaseRPC.getServer(this, address.getBindAddress(),
- address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
- false, conf);
+ this.server = ServerFactory.getServer(this, this.conf, this.address);
this.server.setErrorHandler(this);
// Address is giving a default IP for the moment. Will be changed after
// calling the master.
@@ -375,7 +376,7 @@
} else if (type == EventType.NodeDeleted) {
watchMasterAddress();
} else if (type == EventType.NodeCreated) {
- getMaster();
+ this.hbaseMaster = setupMasterInterface();
// ZooKeeper watches are one time only, so we need to re-register our watch.
watchMasterAddress();
@@ -764,7 +765,13 @@
// Master may have sent us a new address with the other configs.
// Update our address in this case. See HBASE-719
String hra = conf.get("hbase.regionserver.address");
- if (address != null) {
+ // TODO: The below used to be this.address != null. Was broken by what
+ // looks like a mistake in:
+ //
+ // HBASE-1215 migration; metautils scan of meta region was broken; wouldn't see first row
+ // ------------------------------------------------------------------------
+ // r796326 | stack | 2009-07-21 07:40:34 -0700 (Tue, 21 Jul 2009) | 38 lines
+ if (hra != null) {
HServerAddress hsa = new HServerAddress (hra,
this.serverInfo.getServerAddress().getPort());
LOG.info("Master passed us address to use. Was=" +
@@ -1276,11 +1283,11 @@
Threads.shutdown(this.hlogRoller);
}
- private boolean getMaster() {
+ protected HMasterRegionInterface setupMasterInterface() {
HServerAddress masterAddress = null;
while (masterAddress == null) {
if (stopRequested.get()) {
- return false;
+ return null;
}
try {
masterAddress = zooKeeperWrapper.readMasterAddressOrThrow();
@@ -1297,25 +1304,60 @@
try {
// Do initial RPC setup. The final argument indicates that the RPC
// should retry indefinitely.
- master = (HMasterRegionInterface)HBaseRPC.waitForProxy(
- HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
- masterAddress.getInetSocketAddress(),
- this.conf, -1, this.rpcTimeout);
+ master = getMaster(this.conf, masterAddress, this.rpcTimeout);
} catch (IOException e) {
LOG.warn("Unable to connect to master. Retrying. Error was:", e);
sleeper.sleep();
}
}
- this.hbaseMaster = master;
- return true;
+ return master;
}
/*
+ * By default returns an instance of RPC proxy on {@link HMasterRegionInterface}.
+ * To customize, pass the name of an implementation of
+ * {@link HMasterRegionInterface} that has a constructor that takes
+ * {@link HBaseConfiguration} and {@link HServerAddress}.
+ * @param c
+ * @param a
+ * @param rpcTimeout
+ * @return Instance of {@link HMasterRegionInterface}.
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ private HMasterRegionInterface getMaster(final HBaseConfiguration c,
+ final HServerAddress a, final long rpcTimeout)
+ throws IOException {
+ String masterClass = c.get(MASTER_CLASS_KEY, null);
+ // If an alternate class supplied, make an instance. Presume that it
+ // has a constructor that takes the Configuration and an HServerAddress.
+ if (masterClass != null) {
+ try {
+ Class cls = Class.forName(masterClass);
+ Constructor constructor =
+ cls.getConstructor(new Class [] {HBaseConfiguration.class, HServerAddress.class});
+ return (HMasterRegionInterface)constructor.newInstance(new Object [] {c, a});
+ } catch (Exception e) {
+ if (e instanceof IOException) throw (IOException)e;
+ else {
+ IOException ioe = new IOException();
+ ioe.initCause(e);
+ }
+ }
+ }
+ return (HMasterRegionInterface)HBaseRPC.waitForProxy(
+ HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
+ a.getInetSocketAddress(), c, -1, rpcTimeout);
+ }
+
+ /*
* Let the master know we're here
* Run initialization using parameters passed us by the master.
*/
private MapWritable reportForDuty() {
- while (!stopRequested.get() && !getMaster()) {
+ while (!stopRequested.get()) {
+ this.hbaseMaster = setupMasterInterface();
+ if (this.hbaseMaster != null) break;
sleeper.sleep();
LOG.warn("Unable to get master for initialization");
}
@@ -2399,10 +2441,90 @@
return fs;
}
+ /**
+ * @return The address of this server.
+ */
+ public HServerAddress getAddress() { return this.address; }
+
+ /** {@inheritDoc} */
+ public long incrementColumnValue(byte [] regionName, byte [] row,
+ byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
+ throws IOException {
+ checkOpen();
+
+ if (regionName == null) {
+ throw new IOException("Invalid arguments to incrementColumnValue " +
+ "regionName is null");
+ }
+ requestCount.incrementAndGet();
+ try {
+ HRegion region = getRegion(regionName);
+ long retval = region.incrementColumnValue(row, family, qualifier, amount,
+ writeToWAL);
+
+ return retval;
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ public HRegionInfo[] getRegionsAssignment() throws IOException {
+ HRegionInfo[] regions = new HRegionInfo[onlineRegions.size()];
+ Iterator ite = onlineRegions.values().iterator();
+ for(int i = 0; ite.hasNext(); i++) {
+ regions[i] = ite.next().getRegionInfo();
+ }
+ return regions;
+ }
+
+ /** {@inheritDoc} */
+ public HServerInfo getHServerInfo() throws IOException {
+ return serverInfo;
+ }
+
+ @Override
+ public MultiPutResponse multiPut(MultiPut puts) throws IOException {
+ MultiPutResponse resp = new MultiPutResponse();
+
+ // do each region as it's own.
+ for( Map.Entry> e: puts.puts.entrySet()) {
+ int result = put(e.getKey(), e.getValue().toArray(new Put[]{}));
+ resp.addResult(e.getKey(), result);
+
+ e.getValue().clear(); // clear some RAM
+ }
+
+ return resp;
+ }
+
//
// Main program and support routines
//
+
+ /**
+ * @param hrs
+ * @return Thread the RegionServer is running in correctly named.
+ */
+ public static Thread startRegionServer(final HRegionServer hrs) {
+ return startRegionServer(hrs,
+ "regionserver" + hrs.server.getListenerAddress());
+ }
+ /**
+ * @param hrs
+ * @param name
+ * @return Thread the RegionServer is running in correctly named.
+ */
+ public static Thread startRegionServer(final HRegionServer hrs,
+ final String name) {
+ Thread t = new Thread(hrs);
+ t.setName(name);
+ t.start();
+ return t;
+ }
+
private static void printUsageAndExit() {
printUsageAndExit(null);
}
@@ -2414,7 +2536,7 @@
System.err.println("Usage: java org.apache.hbase.HRegionServer start|stop");
System.exit(0);
}
-
+
/**
* Do class main.
* @param args
@@ -2444,10 +2566,7 @@
}
Constructor extends HRegionServer> c =
regionServerClass.getConstructor(HBaseConfiguration.class);
- HRegionServer hrs = c.newInstance(conf);
- Thread t = new Thread(hrs);
- t.setName("regionserver" + hrs.server.getListenerAddress());
- t.start();
+ startRegionServer(c.newInstance(conf));
}
} catch (Throwable t) {
LOG.error( "Can not start region server because "+
@@ -2466,45 +2585,7 @@
printUsageAndExit();
}
}
-
- /** {@inheritDoc} */
- public long incrementColumnValue(byte [] regionName, byte [] row,
- byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
- throws IOException {
- checkOpen();
- if (regionName == null) {
- throw new IOException("Invalid arguments to incrementColumnValue " +
- "regionName is null");
- }
- requestCount.incrementAndGet();
- try {
- HRegion region = getRegion(regionName);
- long retval = region.incrementColumnValue(row, family, qualifier, amount,
- writeToWAL);
-
- return retval;
- } catch (IOException e) {
- checkFileSystem();
- throw e;
- }
- }
-
- /** {@inheritDoc} */
- public HRegionInfo[] getRegionsAssignment() throws IOException {
- HRegionInfo[] regions = new HRegionInfo[onlineRegions.size()];
- Iterator ite = onlineRegions.values().iterator();
- for(int i = 0; ite.hasNext(); i++) {
- regions[i] = ite.next().getRegionInfo();
- }
- return regions;
- }
-
- /** {@inheritDoc} */
- public HServerInfo getHServerInfo() throws IOException {
- return serverInfo;
- }
-
/**
* @param args
*/
@@ -2516,20 +2597,4 @@
HRegionServer.class);
doMain(args, regionServerClass);
}
-
-
- @Override
- public MultiPutResponse multiPut(MultiPut puts) throws IOException {
- MultiPutResponse resp = new MultiPutResponse();
-
- // do each region as it's own.
- for( Map.Entry> e: puts.puts.entrySet()) {
- int result = put(e.getKey(), e.getValue().toArray(new Put[]{}));
- resp.addResult(e.getKey(), result);
-
- e.getValue().clear(); // clear some RAM
- }
-
- return resp;
- }
}
Index: src/java/org/apache/hadoop/hbase/master/HMaster.java
===================================================================
--- src/java/org/apache/hadoop/hbase/master/HMaster.java (revision 937144)
+++ src/java/org/apache/hadoop/hbase/master/HMaster.java (working copy)
@@ -19,8 +19,8 @@
*/
package org.apache.hadoop.hbase.master;
+import java.io.File;
import java.io.IOException;
-import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.Constructor;
@@ -33,8 +33,8 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -56,9 +56,9 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.TableExistsException;
-import org.apache.hadoop.hbase.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Result;
@@ -66,12 +66,13 @@
import org.apache.hadoop.hbase.client.ServerConnection;
import org.apache.hadoop.hbase.client.ServerConnectionManager;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.ServerFactory;
+import org.apache.hadoop.hbase.ipc.ServerInterface;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.regionserver.HLog;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -80,8 +81,8 @@
import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
@@ -134,7 +135,7 @@
volatile BlockingQueue toDoQueue =
new PriorityBlockingQueue();
- private final HBaseServer server;
+ private final ServerInterface server;
private final HServerAddress address;
final ServerConnection connection;
@@ -174,8 +175,8 @@
conf.get("hbase.master.dns.nameserver","default"));
addressStr += ":" +
conf.get(MASTER_PORT, Integer.toString(DEFAULT_MASTER_PORT));
- HServerAddress address = new HServerAddress(addressStr);
- LOG.info("My address is " + address);
+ HServerAddress hsa = new HServerAddress(addressStr);
+ LOG.info("My address is " + hsa);
this.conf = conf;
this.rootdir = new Path(conf.get(HBASE_DIR));
@@ -220,9 +221,7 @@
this.maxRegionOpenTime =
conf.getLong("hbase.hbasemaster.maxregionopen", 120 * 1000);
this.leaseTimeout = conf.getInt("hbase.master.lease.period", 120 * 1000);
- this.server = HBaseRPC.getServer(this, address.getBindAddress(),
- address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
- false, conf);
+ this.server = ServerFactory.getServer(this, this.conf, hsa);
// The rpc-server port can be ephemeral... ensure we have the correct info
this.address = new HServerAddress(server.getListenerAddress());
@@ -376,6 +375,14 @@
}
/**
+ * Used testing.
+ * @param hri Region to set unassigned.
+ */
+ void setUnassigned(final HRegionInfo hri) {
+ this.regionManager.setUnassigned(hri, false);
+ }
+
+ /**
* @return Location of the -ROOT- region.
*/
public HServerAddress getRootRegionLocation() {
Index: src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
===================================================================
--- src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (revision 937144)
+++ src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (working copy)
@@ -380,8 +380,8 @@
* Out is not synchronized because only the first thread does this.
*/
private void writeHeader() throws IOException {
- out.write(HBaseServer.HEADER.array());
- out.write(HBaseServer.CURRENT_VERSION);
+ out.write(ServerInterface.HEADER.array());
+ out.write(ServerInterface.CURRENT_VERSION);
//When there are more fields we can have ConnectionHeader Writable.
DataOutputBuffer buf = new DataOutputBuffer();
ObjectWritable.writeObject(buf, remoteId.getTicket(),
Index: src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
===================================================================
--- src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (revision 937144)
+++ src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (working copy)
@@ -29,10 +29,6 @@
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
Index: src/java/org/apache/hadoop/hbase/ipc/ServerInterface.java
===================================================================
--- src/java/org/apache/hadoop/hbase/ipc/ServerInterface.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/ipc/ServerInterface.java (revision 0)
@@ -0,0 +1,81 @@
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Server.
+ * TODO: Don't need this many of the {@link HBaseServer} methods. Cut them down.
+ */
+public interface ServerInterface {
+ /**
+ * The first four bytes of Hadoop RPC connections
+ */
+ public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
+ // 1 : Introduce ping and server does not throw away RPCs
+ // 3 : RPC was refactored in 0.19
+ public static final byte CURRENT_VERSION = 3;
+
+ /**
+ * Sets the socket buffer size used for responding to RPCs.
+ *
+ * @param size
+ */
+ public abstract void setSocketSendBufSize(int size);
+
+ /** Starts the service. Must be called before any calls will be handled. */
+ public abstract void start();
+
+ /** Stops the service. No new calls will be handled after this is called. */
+ public abstract void stop();
+
+ /**
+ * Wait for the server to be stopped. Does not wait for all subthreads to
+ * finish. See {@link #stop()}.
+ *
+ * @throws InterruptedException
+ */
+ public abstract void join() throws InterruptedException;
+
+ /**
+ * Return the socket (ip+port) on which the RPC server is listening to.
+ *
+ * @return the socket (ip+port) on which the RPC server is listening to.
+ */
+ public abstract InetSocketAddress getListenerAddress();
+
+ /**
+ * Called for each call.
+ *
+ * @param param
+ * @param receiveTime
+ * @return Writable
+ * @throws IOException
+ */
+ public abstract Writable call(Writable param, long receiveTime)
+ throws IOException;
+
+ /**
+ * The number of open RPC conections
+ *
+ * @return the number of open rpc connections
+ */
+ public abstract int getNumOpenConnections();
+
+ /**
+ * The number of rpc calls in the queue.
+ *
+ * @return The number of rpc calls in the queue.
+ */
+ public abstract int getCallQueueLen();
+
+ /**
+ * Set the handler for calling out of RPC for error conditions.
+ *
+ * @param handler
+ * the handler implementation
+ */
+ public abstract void setErrorHandler(HBaseRPCErrorHandler handler);
+}
\ No newline at end of file
Index: src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
===================================================================
--- src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 937144)
+++ src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy)
@@ -67,25 +67,15 @@
*
* @see HBaseClient
*/
-public abstract class HBaseServer {
+public abstract class HBaseServer implements ServerInterface {
+ public static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
/**
- * The first four bytes of Hadoop RPC connections
- */
- public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
-
- // 1 : Introduce ping and server does not throw away RPCs
- // 3 : RPC was refactored in 0.19
- public static final byte CURRENT_VERSION = 3;
-
- /**
* How many calls/handler are allowed in the queue.
*/
private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
- public static final Log LOG =
- LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
-
protected static final ThreadLocal SERVER =
new ThreadLocal();
@@ -95,7 +85,7 @@
* the server context.
* @return HBaseServer
*/
- public static HBaseServer get() {
+ public static ServerInterface get() {
return SERVER.get();
}
@@ -1014,12 +1004,14 @@
connection.close();
}
- /** Sets the socket buffer size used for responding to RPCs.
- * @param size
- */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#setSocketSendBufSize(int)
+ */
public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
- /** Starts the service. Must be called before any calls will be handled. */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#start()
+ */
public synchronized void start() {
responder.start();
listener.start();
@@ -1031,7 +1023,9 @@
}
}
- /** Stops the service. No new calls will be handled after this is called. */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#stop()
+ */
public synchronized void stop() {
LOG.info("Stopping server on " + port);
running = false;
@@ -1051,54 +1045,45 @@
}
}
- /** Wait for the server to be stopped.
- * Does not wait for all subthreads to finish.
- * See {@link #stop()}.
- * @throws InterruptedException
- */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#join()
+ */
public synchronized void join() throws InterruptedException {
while (running) {
wait();
}
}
- /**
- * Return the socket (ip+port) on which the RPC server is listening to.
- * @return the socket (ip+port) on which the RPC server is listening to.
- */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#getListenerAddress()
+ */
public synchronized InetSocketAddress getListenerAddress() {
return listener.getAddress();
}
- /** Called for each call.
- * @param param
- * @param receiveTime
- * @return Writable
- * @throws IOException
- */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#call(org.apache.hadoop.io.Writable, long)
+ */
public abstract Writable call(Writable param, long receiveTime)
throws IOException;
- /**
- * The number of open RPC conections
- * @return the number of open rpc connections
- */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#getNumOpenConnections()
+ */
public int getNumOpenConnections() {
return numConnections;
}
- /**
- * The number of rpc calls in the queue.
- * @return The number of rpc calls in the queue.
- */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#getCallQueueLen()
+ */
public int getCallQueueLen() {
return callQueue.size();
}
- /**
- * Set the handler for calling out of RPC for error conditions.
- * @param handler the handler implementation
- */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#setErrorHandler(org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler)
+ */
public void setErrorHandler(HBaseRPCErrorHandler handler) {
this.errorHandler = handler;
}
Index: src/java/org/apache/hadoop/hbase/ipc/ServerFactory.java
===================================================================
--- src/java/org/apache/hadoop/hbase/ipc/ServerFactory.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/ipc/ServerFactory.java (revision 0)
@@ -0,0 +1,53 @@
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HServerAddress;
+
+/**
+ * A Server Factory.
+ * If hbase.server.class is set in the passed configuration,
+ * its presumed its an implementation of {@link ServerInterface} and that it
+ * has a constructor that takes an HBaseConfiguration and a HServerAdddress.
+ * This factory will instantiate an instance of this configured class.
+ * Otherwise it returns result of
+ * {@link HBaseRPC#getServer(Object, String, int, int, boolean, org.apache.hadoop.conf.Configuration)}
+ */
+public class ServerFactory {
+ public static final String SERVER_CLASS_KEY = "hbase.server.interface.class";
+
+ /**
+ * Return a {@link ServerInterface} implementation.
+ * @param instance
+ * @param c Configuration
+ * @param a Address
+ * @return An ServerInterface implementation.
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public static ServerInterface getServer(final Object instance,
+ final HBaseConfiguration c, final HServerAddress a)
+ throws IOException {
+ String serverClass = c.get("hbase.server.class", null);
+ // If an alternate class supplied, make an instance. Presume that it
+ // has a constructor that takes the Configuration and an HServerAddress.
+ if (serverClass != null) {
+ try {
+ Class cls = Class.forName(serverClass);
+ Constructor constructor =
+ cls.getConstructor(new Class [] {HBaseConfiguration.class, HServerAddress.class});
+ return (ServerInterface)constructor.newInstance(new Object [] {c, a});
+ } catch (Exception e) {
+ if (e instanceof IOException) throw (IOException)e;
+ else {
+ IOException ioe = new IOException();
+ ioe.initCause(e);
+ }
+ }
+ }
+ return HBaseRPC.getServer(instance, a.getBindAddress(), a.getPort(),
+ c.getInt("hbase.regionserver.handler.count", 25), false, c);
+ }
+}
Index: src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
===================================================================
--- src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 937144)
+++ src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy)
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.Collections;
@@ -28,11 +29,11 @@
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -69,9 +70,12 @@
*
* Used by {@link HTable} and {@link HBaseAdmin}
*/
+@SuppressWarnings("serial")
public class HConnectionManager implements HConstants {
private static final Delete [] DELETE_ARRAY_TYPE = new Delete[0];
private static final Put [] PUT_ARRAY_TYPE = new Put[0];
+ public static final String CONNECTION_CLASS_KEY =
+ "hbase.connection.interface.class";
// Register a shutdown hook, one that cleans up RPC and closes zk sessions.
static {
@@ -114,7 +118,20 @@
* @param conf
* @return HConnection object for the instance specified by the configuration
*/
+ @SuppressWarnings("unchecked")
public static HConnection getConnection(HBaseConfiguration conf) {
+ String className = conf.get(CONNECTION_CLASS_KEY, null);
+ if (className != null) {
+ try {
+ Class cls = Class.forName(className);
+ Constructor constructor =
+ cls.getConstructor(new Class [] {HBaseConfiguration.class});
+ return (HConnection)constructor.newInstance(new Object [] {conf});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ // Else do default.
TableServers connection;
synchronized (HBASE_INSTANCES) {
connection = HBASE_INSTANCES.get(conf);
@@ -125,7 +142,7 @@
}
return connection;
}
-
+
/**
* Delete connection information for the instance specified by configuration
* @param conf
@@ -942,10 +959,7 @@
server = this.servers.get(regionServer.toString());
if (server == null) { // Get a connection
try {
- server = (HRegionInterface)HBaseRPC.waitForProxy(
- serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
- regionServer.getInetSocketAddress(), this.conf,
- this.maxRPCAttempts, this.rpcTimeout);
+ server = createHRegionServer(regionServer);
} catch (RemoteException e) {
throw RemoteExceptionHandler.decodeRemoteException(e);
}
@@ -954,7 +968,20 @@
}
return server;
}
-
+
+ /*
+ * @param regionServer
+ * @return Implementation of HRegionInterface.
+ * @throws IOException
+ */
+ private HRegionInterface createHRegionServer(final HServerAddress regionServer)
+ throws IOException {
+ return (HRegionInterface) HBaseRPC.waitForProxy(
+ this.serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
+ regionServer.getInetSocketAddress(), this.conf, this.maxRPCAttempts,
+ this.rpcTimeout);
+ }
+
public HRegionInterface getHRegionConnection(
HServerAddress regionServer)
throws IOException {