Index: src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java
===================================================================
--- src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 937590)
+++ src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy)
@@ -68,6 +68,7 @@
private MiniDFSCluster dfsCluster = null;
private MiniHBaseCluster hbaseCluster = null;
private MiniMRCluster mrCluster = null;
+ // If non-null, then already a cluster running.
private File clusterTestBuildDir = null;
private HBaseAdmin hbaseAdmin = null;
@@ -98,6 +99,36 @@
}
/**
+ * Home our cluster in a dir under build/test. Give it a random name
+ * so can have many concurrent clusters running if we need to. Need to
+ * amend the test.build.data System property. Its what minidfscluster bases
+ * it data dir on. Moding a System property is not the way to do concurrent
+ * instances -- another instance could grab the temporary
+ * value unintentionally -- but not anything can do about it at moment; its
+ * how the minidfscluster works.
+ * @return The calculated cluster test build directory.
+ */
+ File setupClusterTestBuildDir() {
+ String oldTestBuildDir =
+ System.getProperty(TEST_DIRECTORY_KEY, "build/test/data");
+ String randomStr = UUID.randomUUID().toString();
+ String dirStr = oldTestBuildDir + "." + randomStr;
+ File dir = new File(dirStr).getAbsoluteFile();
+ // Have it cleaned up on exit
+ dir.deleteOnExit();
+ return dir;
+ }
+
+ /**
+ * @throws IOException If cluster already running.
+ */
+ void isRunningCluster() throws IOException {
+ if (this.clusterTestBuildDir == null) return;
+ throw new IOException("Cluster already running at " +
+ this.clusterTestBuildDir);
+ }
+
+ /**
* @param subdirName
* @return Path to a subdirectory named subdirName under
* {@link #getTestDir()}.
@@ -114,16 +145,35 @@
startMiniCluster(1);
}
+ /**
+ * Call this if you only want a zk cluster.
+ * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
+ * @throws Exception
+ * @see #shutdownMiniZKCluster()
+ */
public void startMiniZKCluster() throws Exception {
- // Note that this is done before we create the MiniHBaseCluster because we
- // need to edit the config to add the ZooKeeper servers.
+ isRunningCluster();
+ this.clusterTestBuildDir = setupClusterTestBuildDir();
+ startMiniZKCluster(this.clusterTestBuildDir);
+
+ }
+
+ private void startMiniZKCluster(final File dir) throws Exception {
this.zkCluster = new MiniZooKeeperCluster();
- int clientPort = this.zkCluster.startup(this.clusterTestBuildDir);
+ int clientPort = this.zkCluster.startup(dir);
this.conf.set("hbase.zookeeper.property.clientPort",
Integer.toString(clientPort));
}
/**
+ * @throws IOException
+ * @see #startMiniZKCluster()
+ */
+ public void shutdownMiniZKCluster() throws IOException {
+ if (this.zkCluster != null) this.zkCluster.shutdown();
+ }
+
+ /**
* Start up a minicluster of hbase, optinally dfs, and zookeeper.
* Modifies Configuration. Homes the cluster data directory under a random
* subdirectory in a directory under System property test.build.data.
@@ -138,27 +188,13 @@
throws Exception {
LOG.info("Starting up minicluster");
// If we already put up a cluster, fail.
- if (this.clusterTestBuildDir != null) {
- throw new IOException("Cluster already running at " +
- this.clusterTestBuildDir);
- }
- // Now, home our cluster in a dir under build/test. Give it a random name
- // so can have many concurrent clusters running if we need to. Need to
- // amend the test.build.data System property. Its what minidfscluster bases
- // it data dir on. Moding a System property is not the way to do concurrent
- // instances -- another instance could grab the temporary
- // value unintentionally -- but not anything can do about it at moment; its
- // how the minidfscluster works.
- String oldTestBuildDir =
+ isRunningCluster();
+ String oldBuildTestDir =
System.getProperty(TEST_DIRECTORY_KEY, "build/test/data");
- String randomStr = UUID.randomUUID().toString();
- String clusterTestBuildDirStr = oldTestBuildDir + "." + randomStr;
- this.clusterTestBuildDir =
- new File(clusterTestBuildDirStr).getAbsoluteFile();
- // Have it cleaned up on exit
- this.clusterTestBuildDir.deleteOnExit();
+ this.clusterTestBuildDir = setupClusterTestBuildDir();
+
// Set our random dir while minidfscluster is being constructed.
- System.setProperty(TEST_DIRECTORY_KEY, clusterTestBuildDirStr);
+ System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath());
// Bring up mini dfs cluster. This spews a bunch of warnings about missing
// scheme. TODO: fix.
// Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
@@ -167,7 +203,8 @@
// Restore System property. minidfscluster accesses content of
// the TEST_DIRECTORY_KEY to make bad blocks, a feature we are not using,
// but otherwise, just in constructor.
- System.setProperty(TEST_DIRECTORY_KEY, oldTestBuildDir);
+ System.setProperty(TEST_DIRECTORY_KEY, oldBuildTestDir);
+
// Mangle conf so fs parameter points to minidfs we just started up
FileSystem fs = this.dfsCluster.getFileSystem();
this.conf.set("fs.defaultFS", fs.getUri().toString());
@@ -175,7 +212,7 @@
// It could be created before the cluster
if(this.zkCluster == null) {
- startMiniZKCluster();
+ startMiniZKCluster(this.clusterTestBuildDir);
}
// Now do the mini hbase cluster. Set the hbase.rootdir in config.
@@ -193,7 +230,7 @@
/**
* @throws IOException
- * @see {@link #startMiniCluster(boolean, int)}
+ * @see {@link #startMiniCluster(int)}
*/
public void shutdownMiniCluster() throws IOException {
LOG.info("Shutting down minicluster");
@@ -202,7 +239,7 @@
// Wait till hbase is down before going on to shutdown zk.
this.hbaseCluster.join();
}
- if (this.zkCluster != null) this.zkCluster.shutdown();
+ shutdownMiniZKCluster();
if (this.dfsCluster != null) {
// The below throws an exception per dn, AsynchronousCloseException.
this.dfsCluster.shutdown();
Index: src/test/org/apache/hadoop/hbase/TestRegionRebalancing.java
===================================================================
--- src/test/org/apache/hadoop/hbase/TestRegionRebalancing.java (revision 937590)
+++ src/test/org/apache/hadoop/hbase/TestRegionRebalancing.java (working copy)
@@ -33,6 +33,7 @@
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
/**
* Test whether region rebalancing works. (HBASE-71)
@@ -195,7 +196,7 @@
private List getOnlineRegionServers() {
List list = new ArrayList();
- for (LocalHBaseCluster.RegionServerThread rst : cluster.getRegionThreads()) {
+ for (JVMClusterUtil.RegionServerThread rst : cluster.getRegionThreads()) {
if (rst.getRegionServer().isOnline()) {
list.add(rst.getRegionServer());
}
Index: src/test/org/apache/hadoop/hbase/TestClusterTransitions.java
===================================================================
--- src/test/org/apache/hadoop/hbase/TestClusterTransitions.java (revision 0)
+++ src/test/org/apache/hadoop/hbase/TestClusterTransitions.java (revision 0)
@@ -0,0 +1,131 @@
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.ipc.NoNetworkHServerConnection;
+import org.apache.hadoop.hbase.ipc.ServerFactory;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test cluster transitions.
+ * Uses nonetwork interfaces.
+ */
+public class TestClusterTransitions {
+ private HMaster master;
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private List regionservers =
+ new ArrayList();
+ private static final HRegionInfo [] EMPTY_HRI_ARRAY = new HRegionInfo [] {};
+ private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {};
+
+
+ @BeforeClass public static void beforeAllTests() throws Exception {
+ // This class does nonetwork connections between master and regionservers
+ // but haven't done the work to make a nonetwork zk yet.
+ TEST_UTIL.startMiniZKCluster();
+ }
+
+ @AfterClass public static void afterAllTests() throws IOException {
+ TEST_UTIL.shutdownMiniZKCluster();
+ }
+
+
+ /**
+ * Starts up a master.
+ * Fakes it out by sending in startup messages from two *servers*. The two
+ * fictional regionservers then fake the master into thinking that they have
+ * deployed .META. and -ROOT- regions.
+ * @throws IOException
+ */
+ @Before
+ public void setup() throws IOException {
+ // Copy config. before I start messing with it
+ HBaseConfiguration c = new HBaseConfiguration(TEST_UTIL.getConfiguration());
+ // Set an alternate to the RPC Server class, one that does not use network.
+ c.set(ServerFactory.SERVER_CLASS_KEY, "org.apache.hadoop.hbase.ipc.NoNetworkServer");
+ // Set an alternate connection class, one that does not use network.
+ c.set(HConnectionManager.CONNECTION_CLASS_KEY,
+ "org.apache.hadoop.hbase.ipc.NoNetworkHServerConnection");
+
+ // Set the hbase.rootdir into config.
+ FileSystem fs = FileSystem.get(c);
+ Path hbaseRootdir = fs.makeQualified(fs.getHomeDirectory());
+ c.set(HConstants.HBASE_DIR, hbaseRootdir.toString());
+ c.setInt(HConstants.MASTER_PORT, 0);
+
+ // Start up master.
+ this.master = new HMaster(c);
+ this.master.start();
+
+ // Get master's connection instance. Add created regionservers to it so it
+ // can do direct invocations against each instance.
+ NoNetworkHServerConnection connection =
+ (NoNetworkHServerConnection)this.master.getServerConnection();
+
+ for (int i = 0; i < 2; i++) {
+ this.regionservers.add(createNoNetworkRegionServer(c, this.master, connection, i));
+ }
+ // Master already started so pass null for it.
+ JVMClusterUtil.startup(null, this.regionservers);
+ }
+
+ /*
+ * @param c
+ * @param m
+ * @param connection
+ * @param port
+ * @return A RegionServerThread with an instance of NoNetworkRegionServer.
+ * @throws IOException
+ */
+ private JVMClusterUtil.RegionServerThread createNoNetworkRegionServer(final HBaseConfiguration c,
+ final HMaster m, final NoNetworkHServerConnection connection, final int port)
+ throws IOException {
+ HRegionServer hrs = new NoNetworkRegionServer(c, m, port);
+ JVMClusterUtil.RegionServerThread rst =
+ new JVMClusterUtil.RegionServerThread(hrs, port);
+ connection.add(hrs);
+ return rst;
+ }
+
+ @After
+ public void teardown() {
+ JVMClusterUtil.shutdown(this.master, regionservers);
+ }
+
+ /**
+ * @see HBASE-2428
+ */
+ @Test public void testRegionCloseWhenNoMeta() throws Exception {
+ /*
+ // Push some random regions on to the master to assign.
+ HTableDescriptor htd = new HTableDescriptor("test_table");
+ for(byte[] family: new byte [][] {Bytes.toBytes("one"), Bytes.toBytes("two"),
+ Bytes.toBytes("three")}) {
+ htd.addFamily(new HColumnDescriptor(family));
+ }
+ // Add three regions to assign.
+ HRegionInfo hri = new HRegionInfo(htd, HConstants.EMPTY_START_ROW,
+ Bytes.toBytes("01"));
+ */
+ int x = 0;
+ /*
+ this.master.setUnassigned(hri);
+ hri = new HRegionInfo(htd, Bytes.toBytes("01"), Bytes.toBytes("02"));
+ this.master.setUnassigned(hri);
+ hri = new HRegionInfo(htd, Bytes.toBytes("02"), HConstants.EMPTY_END_ROW);
+ this.master.setUnassigned(hri);
+ */
+ }
+}
Index: src/test/org/apache/hadoop/hbase/regionserver/DisabledTestRegionServerExit.java
===================================================================
--- src/test/org/apache/hadoop/hbase/regionserver/DisabledTestRegionServerExit.java (revision 937590)
+++ src/test/org/apache/hadoop/hbase/regionserver/DisabledTestRegionServerExit.java (working copy)
@@ -39,6 +39,7 @@
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
/**
* Tests region server failover when a region server exits both cleanly and
@@ -125,7 +126,7 @@
* is just shut down.
*/
private void stopOrAbortMetaRegionServer(boolean abort) {
- List regionThreads =
+ List regionThreads =
cluster.getRegionThreads();
int server = -1;
Index: src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
===================================================================
--- src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 937590)
+++ src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy)
@@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
/**
* This class creates a single process HBase cluster. One thread is created for
@@ -83,7 +84,7 @@
* @return Name of regionserver started.
*/
public String startRegionServer() throws IOException {
- LocalHBaseCluster.RegionServerThread t =
+ JVMClusterUtil.RegionServerThread t =
this.hbaseCluster.addRegionServer();
t.start();
t.waitForServerOnline();
@@ -126,7 +127,7 @@
* @param serverNumber Used as index into a list.
* @return the region server that was stopped
*/
- public LocalHBaseCluster.RegionServerThread stopRegionServer(int serverNumber) {
+ public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
return stopRegionServer(serverNumber, true);
}
@@ -140,9 +141,9 @@
* before end of the test.
* @return the region server that was stopped
*/
- public LocalHBaseCluster.RegionServerThread stopRegionServer(int serverNumber,
+ public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
final boolean shutdownFS) {
- LocalHBaseCluster.RegionServerThread server =
+ JVMClusterUtil.RegionServerThread server =
hbaseCluster.getRegionServers().get(serverNumber);
LOG.info("Stopping " + server.toString());
if (!shutdownFS) {
@@ -185,7 +186,7 @@
* @throws IOException
*/
public void flushcache() throws IOException {
- for (LocalHBaseCluster.RegionServerThread t:
+ for (JVMClusterUtil.RegionServerThread t:
this.hbaseCluster.getRegionServers()) {
for(HRegion r: t.getRegionServer().getOnlineRegions()) {
r.flushcache();
@@ -196,7 +197,7 @@
/**
* @return List of region server threads.
*/
- public List getRegionThreads() {
+ public List getRegionThreads() {
return this.hbaseCluster.getRegionServers();
}
Index: src/test/org/apache/hadoop/hbase/NoNetworkRegionServer.java
===================================================================
--- src/test/org/apache/hadoop/hbase/NoNetworkRegionServer.java (revision 0)
+++ src/test/org/apache/hadoop/hbase/NoNetworkRegionServer.java (revision 0)
@@ -0,0 +1,45 @@
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
+import org.apache.hadoop.hbase.ipc.NoNetworkHMasterRegion;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+
+/**
+ * Subclass so can set no networking implementation of {@link HRegionServer}.
+ */
+public class NoNetworkRegionServer extends HRegionServer {
+ private final HMaster master;
+
+ public NoNetworkRegionServer(final HBaseConfiguration c, final HMaster m,
+ final int port) throws IOException {
+ super(setNoNetworkingHMasterRegion(c, port));
+ this.master = m;
+ }
+
+ /*
+ * @param c
+ *
+ * @param port
+ *
+ * @return A copy of c with HRegionServer.MASTER_CLASS_KEY and
+ * port set.
+ */
+ private static HBaseConfiguration setNoNetworkingHMasterRegion(
+ final HBaseConfiguration c, final int port) {
+ HBaseConfiguration hbc = new HBaseConfiguration(c);
+ hbc.set(HRegionServer.MASTER_CLASS_KEY,
+ "org.apache.hadoop.hbase.ipc.NoNetworkHMasterRegion");
+ hbc.setInt(HRegionServer.REGIONSERVER_PORT, port);
+ return hbc;
+ }
+
+ @Override
+ public HMasterRegionInterface setupMasterInterface() {
+ HMasterRegionInterface i = super.setupMasterInterface();
+ ((NoNetworkHMasterRegion) i).setMaster(this.master);
+ return i;
+ }
+}
\ No newline at end of file
Index: src/test/org/apache/hadoop/hbase/ipc/NoNetworkHServerConnection.java
===================================================================
--- src/test/org/apache/hadoop/hbase/ipc/NoNetworkHServerConnection.java (revision 0)
+++ src/test/org/apache/hadoop/hbase/ipc/NoNetworkHServerConnection.java (revision 0)
@@ -0,0 +1,202 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.client.ServerConnection;
+import org.apache.hadoop.hbase.ipc.HMasterInterface;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+
+/**
+ * Implementation of {@link ServerConnection} that does not go via the
+ * network; i.e. no RPC'ing.
+ */
+public class NoNetworkHServerConnection implements ServerConnection {
+ private final Map regionservers =
+ new HashMap();
+
+ public NoNetworkHServerConnection(final HBaseConfiguration c) {
+ // Nothing to do.
+ }
+
+ @Override
+ public void clearRegionCache() {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public HRegionInterface getHRegionConnection(HServerAddress regionServer)
+ throws IOException {
+ return getHRegionConnection(regionServer, false);
+ }
+
+ public void add(final HRegionServer rs) {
+ this.regionservers.put(rs.getAddress(), rs);
+ }
+
+ @Override
+ public HRegionInterface getHRegionConnection(HServerAddress regionServer,
+ boolean getMaster)
+ throws IOException {
+ return this.regionservers.get(regionServer);
+ }
+
+ @Override
+ public HTableDescriptor getHTableDescriptor(byte[] tableName)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public HMasterInterface getMaster() throws MasterNotRunningException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public HRegionLocation getRegionLocation(byte[] tableName, byte[] row,
+ boolean reload)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public T getRegionServerForWithoutRetries(ServerCallable callable)
+ throws IOException, RuntimeException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public T getRegionServerWithRetries(ServerCallable callable)
+ throws IOException, RuntimeException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ZooKeeperWrapper getZooKeeperWrapper() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean isMasterRunning() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isTableAvailable(byte[] tableName) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isTableDisabled(byte[] tableName) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean isTableEnabled(byte[] tableName) throws IOException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public HTableDescriptor[] listTables() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public HRegionLocation locateRegion(byte[] tableName, byte[] row)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int processBatchOfDeletes(List list, byte[] tableName)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void processBatchOfPuts(List list, byte[] tableName,
+ ExecutorService pool) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public int processBatchOfRows(ArrayList list, byte[] tableName)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public HRegionLocation relocateRegion(byte[] tableName, byte[] row)
+ throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean tableExists(byte[] tableName) throws MasterNotRunningException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void setRootRegionLocation(HRegionLocation rootRegion) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void unsetRootRegionLocation() {
+ // TODO Auto-generated method stub
+
+ }
+}
\ No newline at end of file
Index: src/test/org/apache/hadoop/hbase/ipc/NoNetworkServer.java
===================================================================
--- src/test/org/apache/hadoop/hbase/ipc/NoNetworkServer.java (revision 0)
+++ src/test/org/apache/hadoop/hbase/ipc/NoNetworkServer.java (revision 0)
@@ -0,0 +1,92 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Implementation of {@link ServerInterface} that does not go via the
+ * network; i.e. no RPC'ing.
+ */
+public class NoNetworkServer implements ServerInterface {
+ private final InetSocketAddress address;
+
+ public NoNetworkServer(final HBaseConfiguration c, final HServerAddress a) {
+ this.address = new InetSocketAddress(a.getHostname(), a.getPort());
+ }
+
+ @Override
+ public Writable call(Writable param, long receiveTime) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int getCallQueueLen() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public InetSocketAddress getListenerAddress() {
+ return this.address;
+ }
+
+ @Override
+ public int getNumOpenConnections() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public void join() throws InterruptedException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setErrorHandler(HBaseRPCErrorHandler handler) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void setSocketSendBufSize(int size) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void start() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void stop() {
+ // TODO Auto-generated method stub
+
+ }
+}
Index: src/test/org/apache/hadoop/hbase/ipc/NoNetworkHMasterRegion.java
===================================================================
--- src/test/org/apache/hadoop/hbase/ipc/NoNetworkHMasterRegion.java (revision 0)
+++ src/test/org/apache/hadoop/hbase/ipc/NoNetworkHMasterRegion.java (revision 0)
@@ -0,0 +1,74 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.io.MapWritable;
+
+/**
+ * Implementation of {@link HMasterRegionInterface} that does not go via the
+ * network; i.e. no RPC'ing but invokes methods on master directly. You must
+ * call {@link #setMaster(HMaster)}, just after {@link HRegionServer#setupMasterInterface()},
+ * so this method has a Master to pass on the invocations to.
+ */
+public class NoNetworkHMasterRegion implements HMasterRegionInterface {
+ private HMaster master;
+
+ public NoNetworkHMasterRegion(final HBaseConfiguration c,
+ final HServerAddress a) {
+ // Nothing to do w/ params.
+ }
+
+ /**
+ * Call just after {@link HRegionServer#setupMasterInterface()} else NPEs
+ * because this instance has not {@link HMaster} to work with.
+ * @param m
+ */
+ public void setMaster(final HMaster m) {
+ this.master = m;
+ }
+
+ @Override
+ public HMsg[] regionServerReport(HServerInfo info, HMsg[] msgs,
+ HRegionInfo[] mostLoadedRegions)
+ throws IOException {
+ return this.master.regionServerReport(info, msgs, mostLoadedRegions);
+ }
+
+ @Override
+ public MapWritable regionServerStartup(HServerInfo info) throws IOException {
+ return this.master.regionServerStartup(info);
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return this.master.getProtocolVersion(protocol, clientVersion);
+ }
+}
\ No newline at end of file
Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
===================================================================
--- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 937590)
+++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
@@ -26,6 +26,7 @@
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
+import java.lang.reflect.Proxy;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -78,20 +79,21 @@
import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.MultiPut;
+import org.apache.hadoop.hbase.client.MultiPutResponse;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ServerConnection;
import org.apache.hadoop.hbase.client.ServerConnectionManager;
-import org.apache.hadoop.hbase.client.MultiPutResponse;
-import org.apache.hadoop.hbase.client.MultiPut;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
-import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.ServerFactory;
+import org.apache.hadoop.hbase.ipc.ServerInterface;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -120,6 +122,8 @@
private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED);
private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {};
+ public static final String MASTER_CLASS_KEY = "hbase.master.class";
+
// Set when a report to the master comes back with a message asking us to
// shutdown. Also set by call to stop when debugging or running unit tests
// of HRegionServer in isolation. We use AtomicBoolean rather than
@@ -171,7 +175,7 @@
// Server to handle client requests. Default access so can be accessed by
// unit tests.
- HBaseServer server;
+ ServerInterface server;
// Leases
private Leases leases;
@@ -293,9 +297,7 @@
this.shutdownHDFS.set(true);
// Server to handle client requests
- this.server = HBaseRPC.getServer(this, address.getBindAddress(),
- address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
- false, conf);
+ this.server = ServerFactory.getServer(this, this.conf, this.address);
this.server.setErrorHandler(this);
// Address is giving a default IP for the moment. Will be changed after
// calling the master.
@@ -375,7 +377,7 @@
} else if (type == EventType.NodeDeleted) {
watchMasterAddress();
} else if (type == EventType.NodeCreated) {
- getMaster();
+ this.hbaseMaster = setupMasterInterface();
// ZooKeeper watches are one time only, so we need to re-register our watch.
watchMasterAddress();
@@ -671,7 +673,9 @@
serverInfo.getServerAddress().toString());
}
if (this.hbaseMaster != null) {
- HBaseRPC.stopProxy(this.hbaseMaster);
+ if (Proxy.isProxyClass(this.hbaseMaster.getClass())) {
+ HBaseRPC.stopProxy(this.hbaseMaster);
+ }
this.hbaseMaster = null;
}
join();
@@ -728,9 +732,7 @@
* @param dfsShutdownWait
*/
public void runThread(final Thread t, final long dfsShutdownWait) {
- if (t == null) {
- return;
- }
+ if (t == null) return;
t.start();
Threads.shutdown(t, dfsShutdownWait);
}
@@ -742,6 +744,7 @@
* @return Previous occupant of the shutdown thread position.
*/
public Thread setHDFSShutdownThreadOnExit(final Thread t) {
+ if (t == null) this.shutdownHDFS.set(false);
Thread old = this.hdfsShutdownThread;
this.hdfsShutdownThread = t;
return old;
@@ -764,7 +767,13 @@
// Master may have sent us a new address with the other configs.
// Update our address in this case. See HBASE-719
String hra = conf.get("hbase.regionserver.address");
- if (address != null) {
+ // TODO: The below used to be this.address != null. Was broken by what
+ // looks like a mistake in:
+ //
+ // HBASE-1215 migration; metautils scan of meta region was broken; wouldn't see first row
+ // ------------------------------------------------------------------------
+ // r796326 | stack | 2009-07-21 07:40:34 -0700 (Tue, 21 Jul 2009) | 38 lines
+ if (hra != null) {
HServerAddress hsa = new HServerAddress (hra,
this.serverInfo.getServerAddress().getPort());
LOG.info("Master passed us address to use. Was=" +
@@ -1276,11 +1285,16 @@
Threads.shutdown(this.hlogRoller);
}
- private boolean getMaster() {
+ /**
+ * Override to intercept implementation of {@link HMasterRegionInterface}.
+ * @return The instance of {@link HMasterRegionInterface} this {@link HRegionServer}
+ * will use.
+ */
+ public HMasterRegionInterface setupMasterInterface() {
HServerAddress masterAddress = null;
while (masterAddress == null) {
if (stopRequested.get()) {
- return false;
+ return null;
}
try {
masterAddress = zooKeeperWrapper.readMasterAddressOrThrow();
@@ -1297,25 +1311,60 @@
try {
// Do initial RPC setup. The final argument indicates that the RPC
// should retry indefinitely.
- master = (HMasterRegionInterface)HBaseRPC.waitForProxy(
- HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
- masterAddress.getInetSocketAddress(),
- this.conf, -1, this.rpcTimeout);
+ master = getMaster(this.conf, masterAddress, this.rpcTimeout);
} catch (IOException e) {
LOG.warn("Unable to connect to master. Retrying. Error was:", e);
sleeper.sleep();
}
}
- this.hbaseMaster = master;
- return true;
+ return master;
}
/*
+ * By default returns an instance of RPC proxy on {@link HMasterRegionInterface}.
+ * To customize, pass the name of an implementation of
+ * {@link HMasterRegionInterface} that has a constructor that takes
+ * {@link HBaseConfiguration} and {@link HServerAddress}.
+ * @param c
+ * @param a
+ * @param rpcTimeout
+ * @return Instance of {@link HMasterRegionInterface}.
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ private HMasterRegionInterface getMaster(final HBaseConfiguration c,
+ final HServerAddress a, final long rpcTimeout)
+ throws IOException {
+ String masterClass = c.get(MASTER_CLASS_KEY, null);
+ // If an alternate class supplied, make an instance. Presume that it
+ // has a constructor that takes the Configuration and an HServerAddress.
+ if (masterClass != null) {
+ try {
+ Class cls = Class.forName(masterClass);
+ Constructor constructor =
+ cls.getConstructor(new Class [] {HBaseConfiguration.class, HServerAddress.class});
+ return (HMasterRegionInterface)constructor.newInstance(new Object [] {c, a});
+ } catch (Exception e) {
+ if (e instanceof IOException) throw (IOException)e;
+ else {
+ IOException ioe = new IOException();
+ ioe.initCause(e);
+ }
+ }
+ }
+ return (HMasterRegionInterface)HBaseRPC.waitForProxy(
+ HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
+ a.getInetSocketAddress(), c, -1, rpcTimeout);
+ }
+
+ /*
* Let the master know we're here
* Run initialization using parameters passed us by the master.
*/
private MapWritable reportForDuty() {
- while (!stopRequested.get() && !getMaster()) {
+ while (!stopRequested.get()) {
+ this.hbaseMaster = setupMasterInterface();
+ if (this.hbaseMaster != null) break;
sleeper.sleep();
LOG.warn("Unable to get master for initialization");
}
@@ -2356,7 +2405,7 @@
}
throw new IOException("Unknown protocol to name node: " + protocol);
}
-
+
/**
* @return Queue to which you can add outbound messages.
*/
@@ -2399,10 +2448,90 @@
return fs;
}
+ /**
+ * @return The address of this server.
+ */
+ public HServerAddress getAddress() { return this.address; }
+
+ /** {@inheritDoc} */
+ public long incrementColumnValue(byte [] regionName, byte [] row,
+ byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
+ throws IOException {
+ checkOpen();
+
+ if (regionName == null) {
+ throw new IOException("Invalid arguments to incrementColumnValue " +
+ "regionName is null");
+ }
+ requestCount.incrementAndGet();
+ try {
+ HRegion region = getRegion(regionName);
+ long retval = region.incrementColumnValue(row, family, qualifier, amount,
+ writeToWAL);
+
+ return retval;
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ public HRegionInfo[] getRegionsAssignment() throws IOException {
+ HRegionInfo[] regions = new HRegionInfo[onlineRegions.size()];
+ Iterator ite = onlineRegions.values().iterator();
+ for(int i = 0; ite.hasNext(); i++) {
+ regions[i] = ite.next().getRegionInfo();
+ }
+ return regions;
+ }
+
+ /** {@inheritDoc} */
+ public HServerInfo getHServerInfo() throws IOException {
+ return serverInfo;
+ }
+
+ @Override
+ public MultiPutResponse multiPut(MultiPut puts) throws IOException {
+ MultiPutResponse resp = new MultiPutResponse();
+
+ // do each region as it's own.
+ for( Map.Entry> e: puts.puts.entrySet()) {
+ int result = put(e.getKey(), e.getValue().toArray(new Put[]{}));
+ resp.addResult(e.getKey(), result);
+
+ e.getValue().clear(); // clear some RAM
+ }
+
+ return resp;
+ }
+
//
// Main program and support routines
//
+
+ /**
+ * @param hrs
+ * @return Thread the RegionServer is running in correctly named.
+ */
+ public static Thread startRegionServer(final HRegionServer hrs) {
+ return startRegionServer(hrs,
+ "regionserver" + hrs.server.getListenerAddress());
+ }
+ /**
+ * @param hrs
+ * @param name
+ * @return Thread the RegionServer is running in correctly named.
+ */
+ public static Thread startRegionServer(final HRegionServer hrs,
+ final String name) {
+ Thread t = new Thread(hrs);
+ t.setName(name);
+ t.start();
+ return t;
+ }
+
private static void printUsageAndExit() {
printUsageAndExit(null);
}
@@ -2414,7 +2543,7 @@
System.err.println("Usage: java org.apache.hbase.HRegionServer start|stop");
System.exit(0);
}
-
+
/**
* Do class main.
* @param args
@@ -2444,10 +2573,7 @@
}
Constructor extends HRegionServer> c =
regionServerClass.getConstructor(HBaseConfiguration.class);
- HRegionServer hrs = c.newInstance(conf);
- Thread t = new Thread(hrs);
- t.setName("regionserver" + hrs.server.getListenerAddress());
- t.start();
+ startRegionServer(c.newInstance(conf));
}
} catch (Throwable t) {
LOG.error( "Can not start region server because "+
@@ -2466,45 +2592,7 @@
printUsageAndExit();
}
}
-
- /** {@inheritDoc} */
- public long incrementColumnValue(byte [] regionName, byte [] row,
- byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
- throws IOException {
- checkOpen();
- if (regionName == null) {
- throw new IOException("Invalid arguments to incrementColumnValue " +
- "regionName is null");
- }
- requestCount.incrementAndGet();
- try {
- HRegion region = getRegion(regionName);
- long retval = region.incrementColumnValue(row, family, qualifier, amount,
- writeToWAL);
-
- return retval;
- } catch (IOException e) {
- checkFileSystem();
- throw e;
- }
- }
-
- /** {@inheritDoc} */
- public HRegionInfo[] getRegionsAssignment() throws IOException {
- HRegionInfo[] regions = new HRegionInfo[onlineRegions.size()];
- Iterator ite = onlineRegions.values().iterator();
- for(int i = 0; ite.hasNext(); i++) {
- regions[i] = ite.next().getRegionInfo();
- }
- return regions;
- }
-
- /** {@inheritDoc} */
- public HServerInfo getHServerInfo() throws IOException {
- return serverInfo;
- }
-
/**
* @param args
*/
@@ -2516,20 +2604,4 @@
HRegionServer.class);
doMain(args, regionServerClass);
}
-
-
- @Override
- public MultiPutResponse multiPut(MultiPut puts) throws IOException {
- MultiPutResponse resp = new MultiPutResponse();
-
- // do each region as it's own.
- for( Map.Entry> e: puts.puts.entrySet()) {
- int result = put(e.getKey(), e.getValue().toArray(new Put[]{}));
- resp.addResult(e.getKey(), result);
-
- e.getValue().clear(); // clear some RAM
- }
-
- return resp;
- }
}
Index: src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java
===================================================================
--- src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (revision 937590)
+++ src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (working copy)
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,6 +35,7 @@
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
/**
@@ -59,7 +61,7 @@
public class LocalHBaseCluster implements HConstants {
static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
private final HMaster master;
- private final List regionThreads;
+ private final List regionThreads;
private final static int DEFAULT_NO = 1;
/** local mode */
public static final String LOCAL = "local";
@@ -96,77 +98,33 @@
// port '0' so there won't be clashes over default port as unit tests
// start/stop ports at different times during the life of the test.
conf.set(REGIONSERVER_PORT, "0");
- this.regionThreads = new ArrayList();
+ this.regionThreads =
+ new CopyOnWriteArrayList();
regionServerClass = (Class extends HRegionServer>) conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
for (int i = 0; i < noRegionServers; i++) {
- addRegionServer();
+ addRegionServer(i);
}
}
- /**
- * Creates a region server.
- * Call 'start' on the returned thread to make it run.
- *
- * @throws IOException
- * @return Region server added.
- */
- public RegionServerThread addRegionServer() throws IOException {
- synchronized (regionThreads) {
- HRegionServer server;
- try {
- server = regionServerClass.getConstructor(HBaseConfiguration.class).
- newInstance(conf);
- } catch (Exception e) {
- IOException ioe = new IOException();
- ioe.initCause(e);
- throw ioe;
- }
- RegionServerThread t = new RegionServerThread(server,
- this.regionThreads.size());
- this.regionThreads.add(t);
- return t;
- }
+ public JVMClusterUtil.RegionServerThread addRegionServer() throws IOException {
+ return addRegionServer(this.regionThreads.size());
}
+ public JVMClusterUtil.RegionServerThread addRegionServer(final int index) throws IOException {
+ JVMClusterUtil.RegionServerThread rst = JVMClusterUtil.createRegionServerThread(this.conf,
+ this.regionServerClass, index);
+ this.regionThreads.add(rst);
+ return rst;
+ }
+
/**
* @param serverNumber
* @return region server
*/
public HRegionServer getRegionServer(int serverNumber) {
- synchronized (regionThreads) {
- return regionThreads.get(serverNumber).getRegionServer();
- }
+ return regionThreads.get(serverNumber).getRegionServer();
}
- /** runs region servers */
- public static class RegionServerThread extends Thread {
- private final HRegionServer regionServer;
-
- RegionServerThread(final HRegionServer r, final int index) {
- super(r, "RegionServer:" + index);
- this.regionServer = r;
- }
-
- /** @return the region server */
- public HRegionServer getRegionServer() {
- return this.regionServer;
- }
-
- /**
- * Block until the region server has come online, indicating it is ready
- * to be used.
- */
- public void waitForServerOnline() {
- while (!regionServer.isOnline()) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // continue waiting
- }
- }
- }
- }
-
/**
* @return the HMaster thread
*/
@@ -177,7 +135,7 @@
/**
* @return Read-only list of region server threads.
*/
- public List getRegionServers() {
+ public List getRegionServers() {
return Collections.unmodifiableList(this.regionThreads);
}
@@ -188,10 +146,8 @@
* @return Name of region server that just went down.
*/
public String waitOnRegionServer(int serverNumber) {
- RegionServerThread regionServerThread;
- synchronized (regionThreads) {
- regionServerThread = this.regionThreads.remove(serverNumber);
- }
+ JVMClusterUtil.RegionServerThread regionServerThread =
+ this.regionThreads.remove(serverNumber);
while (regionServerThread.isAlive()) {
try {
LOG.info("Waiting on " +
@@ -212,14 +168,12 @@
*/
public void join() {
if (this.regionThreads != null) {
- synchronized(this.regionThreads) {
for(Thread t: this.regionThreads) {
if (t.isAlive()) {
try {
t.join();
- } catch (InterruptedException e) {
- // continue
- }
+ } catch (InterruptedException e) {
+ // continue
}
}
}
@@ -232,92 +186,22 @@
}
}
}
-
+
/**
- * Start the cluster.
- * @return Address to use contacting master.
+ * Shutdown the mini HBase cluster
*/
- public String startup() {
- this.master.start();
- synchronized (regionThreads) {
- for (RegionServerThread t: this.regionThreads) {
- t.start();
- }
- }
- return this.master.getMasterAddress().toString();
+ public void startup() {
+ JVMClusterUtil.startup(this.master, this.regionThreads);
}
/**
- * Shut down the mini HBase cluster
+ * Shutdown the mini HBase cluster
*/
public void shutdown() {
- LOG.debug("Shutting down HBase Cluster");
- // Be careful how the hdfs shutdown thread runs in context where more than
- // one regionserver in the mix.
- Thread shutdownThread = null;
- synchronized (this.regionThreads) {
- for (RegionServerThread t: this.regionThreads) {
- Thread tt = t.getRegionServer().setHDFSShutdownThreadOnExit(null);
- if (shutdownThread == null && tt != null) {
- shutdownThread = tt;
- }
- }
- }
- if(this.master != null) {
- this.master.shutdown();
- }
- // regionServerThreads can never be null because they are initialized when
- // the class is constructed.
- synchronized(this.regionThreads) {
- for(Thread t: this.regionThreads) {
- if (t.isAlive()) {
- try {
- t.join();
- } catch (InterruptedException e) {
- // continue
- }
- }
- }
- }
- if (this.master != null) {
- while (this.master.isAlive()) {
- try {
- // The below has been replaced to debug sometime hangs on end of
- // tests.
- // this.master.join():
- threadDumpingJoin(this.master);
- } catch(InterruptedException e) {
- // continue
- }
- }
- }
- Threads.shutdown(shutdownThread);
- LOG.info("Shutdown " +
- ((this.regionThreads != null)? this.master.getName(): "0 masters") +
- " " + this.regionThreads.size() + " region server(s)");
+ JVMClusterUtil.shutdown(this.master, this.regionThreads);
}
/**
- * @param t
- * @throws InterruptedException
- */
- public void threadDumpingJoin(final Thread t) throws InterruptedException {
- if (t == null) {
- return;
- }
- long startTime = System.currentTimeMillis();
- while (t.isAlive()) {
- Thread.sleep(1000);
- if (System.currentTimeMillis() - startTime > 60000) {
- startTime = System.currentTimeMillis();
- ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
- "Automatic Stack Trace every 60 seconds waiting on " +
- t.getName());
- }
- }
- }
-
- /**
* @param c Configuration to check.
* @return True if a 'local' address in hbase.master value.
*/
@@ -341,4 +225,4 @@
admin.createTable(htd);
cluster.shutdown();
}
-}
+}
\ No newline at end of file
Index: src/java/org/apache/hadoop/hbase/master/HMaster.java
===================================================================
--- src/java/org/apache/hadoop/hbase/master/HMaster.java (revision 937590)
+++ src/java/org/apache/hadoop/hbase/master/HMaster.java (working copy)
@@ -19,8 +19,8 @@
*/
package org.apache.hadoop.hbase.master;
+import java.io.File;
import java.io.IOException;
-import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.Constructor;
@@ -33,8 +33,8 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -56,9 +56,9 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.TableExistsException;
-import org.apache.hadoop.hbase.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Result;
@@ -66,12 +66,13 @@
import org.apache.hadoop.hbase.client.ServerConnection;
import org.apache.hadoop.hbase.client.ServerConnectionManager;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.ipc.ServerFactory;
+import org.apache.hadoop.hbase.ipc.ServerInterface;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.regionserver.HLog;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -80,8 +81,8 @@
import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
@@ -134,7 +135,7 @@
volatile BlockingQueue toDoQueue =
new PriorityBlockingQueue();
- private final HBaseServer server;
+ private final ServerInterface server;
private final HServerAddress address;
final ServerConnection connection;
@@ -174,8 +175,8 @@
conf.get("hbase.master.dns.nameserver","default"));
addressStr += ":" +
conf.get(MASTER_PORT, Integer.toString(DEFAULT_MASTER_PORT));
- HServerAddress address = new HServerAddress(addressStr);
- LOG.info("My address is " + address);
+ HServerAddress hsa = new HServerAddress(addressStr);
+ LOG.info("My address is " + hsa);
this.conf = conf;
this.rootdir = new Path(conf.get(HBASE_DIR));
@@ -220,9 +221,7 @@
this.maxRegionOpenTime =
conf.getLong("hbase.hbasemaster.maxregionopen", 120 * 1000);
this.leaseTimeout = conf.getInt("hbase.master.lease.period", 120 * 1000);
- this.server = HBaseRPC.getServer(this, address.getBindAddress(),
- address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
- false, conf);
+ this.server = ServerFactory.getServer(this, this.conf, hsa);
// The rpc-server port can be ephemeral... ensure we have the correct info
this.address = new HServerAddress(server.getListenerAddress());
@@ -376,6 +375,14 @@
}
/**
+ * Used testing.
+ * @param hri Region to set unassigned.
+ */
+ void setUnassigned(final HRegionInfo hri) {
+ this.regionManager.setUnassigned(hri, false);
+ }
+
+ /**
* @return Location of the -ROOT- region.
*/
public HServerAddress getRootRegionLocation() {
@@ -407,6 +414,13 @@
return this.serverManager;
}
+ /**
+ * @return ServerConnection used by this master.
+ */
+ public ServerConnection getServerConnection() {
+ return this.connection;
+ }
+
/** Main processing loop */
@Override
public void run() {
Index: src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
===================================================================
--- src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (revision 937590)
+++ src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (working copy)
@@ -380,8 +380,8 @@
* Out is not synchronized because only the first thread does this.
*/
private void writeHeader() throws IOException {
- out.write(HBaseServer.HEADER.array());
- out.write(HBaseServer.CURRENT_VERSION);
+ out.write(ServerInterface.HEADER.array());
+ out.write(ServerInterface.CURRENT_VERSION);
//When there are more fields we can have ConnectionHeader Writable.
DataOutputBuffer buf = new DataOutputBuffer();
ObjectWritable.writeObject(buf, remoteId.getTicket(),
Index: src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java
===================================================================
--- src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (revision 937590)
+++ src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (working copy)
@@ -29,10 +29,6 @@
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
Index: src/java/org/apache/hadoop/hbase/ipc/ServerInterface.java
===================================================================
--- src/java/org/apache/hadoop/hbase/ipc/ServerInterface.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/ipc/ServerInterface.java (revision 0)
@@ -0,0 +1,81 @@
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Server.
+ * TODO: Don't need this many of the {@link HBaseServer} methods. Cut them down.
+ */
+public interface ServerInterface {
+ /**
+ * The first four bytes of Hadoop RPC connections
+ */
+ public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
+ // 1 : Introduce ping and server does not throw away RPCs
+ // 3 : RPC was refactored in 0.19
+ public static final byte CURRENT_VERSION = 3;
+
+ /**
+ * Sets the socket buffer size used for responding to RPCs.
+ *
+ * @param size
+ */
+ public abstract void setSocketSendBufSize(int size);
+
+ /** Starts the service. Must be called before any calls will be handled. */
+ public abstract void start();
+
+ /** Stops the service. No new calls will be handled after this is called. */
+ public abstract void stop();
+
+ /**
+ * Wait for the server to be stopped. Does not wait for all subthreads to
+ * finish. See {@link #stop()}.
+ *
+ * @throws InterruptedException
+ */
+ public abstract void join() throws InterruptedException;
+
+ /**
+ * Return the socket (ip+port) on which the RPC server is listening to.
+ *
+ * @return the socket (ip+port) on which the RPC server is listening to.
+ */
+ public abstract InetSocketAddress getListenerAddress();
+
+ /**
+ * Called for each call.
+ *
+ * @param param
+ * @param receiveTime
+ * @return Writable
+ * @throws IOException
+ */
+ public abstract Writable call(Writable param, long receiveTime)
+ throws IOException;
+
+ /**
+ * The number of open RPC conections
+ *
+ * @return the number of open rpc connections
+ */
+ public abstract int getNumOpenConnections();
+
+ /**
+ * The number of rpc calls in the queue.
+ *
+ * @return The number of rpc calls in the queue.
+ */
+ public abstract int getCallQueueLen();
+
+ /**
+ * Set the handler for calling out of RPC for error conditions.
+ *
+ * @param handler
+ * the handler implementation
+ */
+ public abstract void setErrorHandler(HBaseRPCErrorHandler handler);
+}
\ No newline at end of file
Index: src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java
===================================================================
--- src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 937590)
+++ src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy)
@@ -67,25 +67,15 @@
*
* @see HBaseClient
*/
-public abstract class HBaseServer {
+public abstract class HBaseServer implements ServerInterface {
+ public static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
/**
- * The first four bytes of Hadoop RPC connections
- */
- public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
-
- // 1 : Introduce ping and server does not throw away RPCs
- // 3 : RPC was refactored in 0.19
- public static final byte CURRENT_VERSION = 3;
-
- /**
* How many calls/handler are allowed in the queue.
*/
private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
- public static final Log LOG =
- LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer");
-
protected static final ThreadLocal SERVER =
new ThreadLocal();
@@ -95,7 +85,7 @@
* the server context.
* @return HBaseServer
*/
- public static HBaseServer get() {
+ public static ServerInterface get() {
return SERVER.get();
}
@@ -1014,12 +1004,14 @@
connection.close();
}
- /** Sets the socket buffer size used for responding to RPCs.
- * @param size
- */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#setSocketSendBufSize(int)
+ */
public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
- /** Starts the service. Must be called before any calls will be handled. */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#start()
+ */
public synchronized void start() {
responder.start();
listener.start();
@@ -1031,7 +1023,9 @@
}
}
- /** Stops the service. No new calls will be handled after this is called. */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#stop()
+ */
public synchronized void stop() {
LOG.info("Stopping server on " + port);
running = false;
@@ -1051,54 +1045,45 @@
}
}
- /** Wait for the server to be stopped.
- * Does not wait for all subthreads to finish.
- * See {@link #stop()}.
- * @throws InterruptedException
- */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#join()
+ */
public synchronized void join() throws InterruptedException {
while (running) {
wait();
}
}
- /**
- * Return the socket (ip+port) on which the RPC server is listening to.
- * @return the socket (ip+port) on which the RPC server is listening to.
- */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#getListenerAddress()
+ */
public synchronized InetSocketAddress getListenerAddress() {
return listener.getAddress();
}
- /** Called for each call.
- * @param param
- * @param receiveTime
- * @return Writable
- * @throws IOException
- */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#call(org.apache.hadoop.io.Writable, long)
+ */
public abstract Writable call(Writable param, long receiveTime)
throws IOException;
- /**
- * The number of open RPC conections
- * @return the number of open rpc connections
- */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#getNumOpenConnections()
+ */
public int getNumOpenConnections() {
return numConnections;
}
- /**
- * The number of rpc calls in the queue.
- * @return The number of rpc calls in the queue.
- */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#getCallQueueLen()
+ */
public int getCallQueueLen() {
return callQueue.size();
}
- /**
- * Set the handler for calling out of RPC for error conditions.
- * @param handler the handler implementation
- */
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.ipc.ServerInterface#setErrorHandler(org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler)
+ */
public void setErrorHandler(HBaseRPCErrorHandler handler) {
this.errorHandler = handler;
}
Index: src/java/org/apache/hadoop/hbase/ipc/ServerFactory.java
===================================================================
--- src/java/org/apache/hadoop/hbase/ipc/ServerFactory.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/ipc/ServerFactory.java (revision 0)
@@ -0,0 +1,53 @@
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HServerAddress;
+
+/**
+ * A Server Factory.
+ * If hbase.server.class is set in the passed configuration,
+ * its presumed its an implementation of {@link ServerInterface} and that it
+ * has a constructor that takes an HBaseConfiguration and a HServerAdddress.
+ * This factory will instantiate an instance of this configured class.
+ * Otherwise it returns result of
+ * {@link HBaseRPC#getServer(Object, String, int, int, boolean, org.apache.hadoop.conf.Configuration)}
+ */
+public class ServerFactory {
+ public static final String SERVER_CLASS_KEY = "hbase.server.interface.class";
+
+ /**
+ * Return a {@link ServerInterface} implementation.
+ * @param instance
+ * @param c Configuration
+ * @param a Address
+ * @return An ServerInterface implementation.
+ * @throws IOException
+ */
+ @SuppressWarnings("unchecked")
+ public static ServerInterface getServer(final Object instance,
+ final HBaseConfiguration c, final HServerAddress a)
+ throws IOException {
+ String serverClass = c.get(SERVER_CLASS_KEY, null);
+ // If an alternate class supplied, make an instance. Presume that it
+ // has a constructor that takes the Configuration and an HServerAddress.
+ if (serverClass != null) {
+ try {
+ Class cls = Class.forName(serverClass);
+ Constructor constructor =
+ cls.getConstructor(new Class [] {HBaseConfiguration.class, HServerAddress.class});
+ return (ServerInterface)constructor.newInstance(new Object [] {c, a});
+ } catch (Exception e) {
+ if (e instanceof IOException) throw (IOException)e;
+ else {
+ IOException ioe = new IOException();
+ ioe.initCause(e);
+ }
+ }
+ }
+ return HBaseRPC.getServer(instance, a.getBindAddress(), a.getPort(),
+ c.getInt("hbase.regionserver.handler.count", 25), false, c);
+ }
+}
Index: src/java/org/apache/hadoop/hbase/util/Threads.java
===================================================================
--- src/java/org/apache/hadoop/hbase/util/Threads.java (revision 937590)
+++ src/java/org/apache/hadoop/hbase/util/Threads.java (working copy)
@@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hbase.util;
+import java.io.PrintWriter;
import java.lang.Thread.UncaughtExceptionHandler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* Thread Utility
@@ -72,6 +74,7 @@
* @param t Thread to shutdown
*/
public static void shutdown(final Thread t, final long joinwait) {
+ if (t == null) return;
while (t.isAlive()) {
try {
t.join(joinwait);
@@ -80,4 +83,27 @@
}
}
}
+
+
+ /**
+ * @param t Waits on the passed thread to die dumping a threaddump every
+ * minute while its up.
+ * @throws InterruptedException
+ */
+ public static void threadDumpingIsAlive(final Thread t)
+ throws InterruptedException {
+ if (t == null) {
+ return;
+ }
+ long startTime = System.currentTimeMillis();
+ while (t.isAlive()) {
+ Thread.sleep(1000);
+ if (System.currentTimeMillis() - startTime > 60000) {
+ startTime = System.currentTimeMillis();
+ ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
+ "Automatic Stack Trace every 60 seconds waiting on " +
+ t.getName());
+ }
+ }
+ }
}
\ No newline at end of file
Index: src/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
===================================================================
--- src/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (revision 0)
+++ src/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (revision 0)
@@ -0,0 +1,156 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+
+/**
+ * Utility used running a cluster all in the one JVM.
+ */
+public class JVMClusterUtil {
+ private static final Log LOG = LogFactory.getLog(JVMClusterUtil.class);
+
+ /**
+ * Datastructure to hold RegionServer Thread and RegionServer instance
+ */
+ public static class RegionServerThread extends Thread {
+ private final HRegionServer regionServer;
+
+ public RegionServerThread(final HRegionServer r, final int index) {
+ super(r, "RegionServer:" + index);
+ this.regionServer = r;
+ }
+
+ /** @return the region server */
+ public HRegionServer getRegionServer() {
+ return this.regionServer;
+ }
+
+ /**
+ * Block until the region server has come online, indicating it is ready
+ * to be used.
+ */
+ public void waitForServerOnline() {
+ while (!regionServer.isOnline()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // continue waiting
+ }
+ }
+ }
+ }
+
+ /**
+ * Creates a {@link RegionServerThread}.
+ * Call 'start' on the returned thread to make it run.
+ * @param c Configuration to use.
+ * @param hrsc Class to create.
+ * @param index Used distingushing the object returned.
+ * @throws IOException
+ * @return Region server added.
+ */
+ public static JVMClusterUtil.RegionServerThread createRegionServerThread(final HBaseConfiguration c,
+ final Class extends HRegionServer> hrsc, final int index)
+ throws IOException {
+ HRegionServer server;
+ try {
+ server = hrsc.getConstructor(HBaseConfiguration.class).newInstance(c);
+ } catch (Exception e) {
+ IOException ioe = new IOException();
+ ioe.initCause(e);
+ throw ioe;
+ }
+ return new JVMClusterUtil.RegionServerThread(server, index);
+ }
+
+ /**
+ * Start the cluster.
+ * @param m
+ * @param regionServers
+ * @return Address to use contacting master.
+ */
+ public static String startup(final HMaster m,
+ final List regionservers) {
+ if (m != null) m.start();
+ if (regionservers != null) {
+ for (JVMClusterUtil.RegionServerThread t: regionservers) {
+ t.start();
+ }
+ }
+ return m == null? null: m.getMasterAddress().toString();
+ }
+
+ /**
+ * @param master
+ * @param regionservers
+ */
+ public static void shutdown(final HMaster master,
+ final List regionservers) {
+ LOG.debug("Shutting down HBase Cluster");
+ // Be careful how the hdfs shutdown thread runs in context where more than
+ // one regionserver in the mix.
+ Thread hdfsShutdownThread = null;
+ for (JVMClusterUtil.RegionServerThread t: regionservers) {
+ Thread tt = t.getRegionServer().setHDFSShutdownThreadOnExit(null);
+ if (hdfsShutdownThread == null && tt != null) {
+ hdfsShutdownThread = tt;
+ }
+ }
+ if (master != null) {
+ master.shutdown();
+ }
+ // regionServerThreads can never be null because they are initialized when
+ // the class is constructed.
+ for(Thread t: regionservers) {
+ if (t.isAlive()) {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+ }
+ if (master != null) {
+ while (master.isAlive()) {
+ try {
+ // The below has been replaced to debug sometime hangs on end of
+ // tests.
+ // this.master.join():
+ Threads.threadDumpingIsAlive(master);
+ } catch(InterruptedException e) {
+ // continue
+ }
+ }
+ }
+ if (hdfsShutdownThread != null) hdfsShutdownThread.start();
+ Threads.shutdown(hdfsShutdownThread);
+ LOG.info("Shutdown " +
+ ((regionservers != null)? master.getName(): "0 masters") +
+ " " + regionservers.size() + " region server(s)");
+ }
+}
\ No newline at end of file
Index: src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
===================================================================
--- src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 937590)
+++ src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy)
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.ArrayList;
import java.util.Collections;
@@ -28,11 +29,11 @@
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -69,9 +70,12 @@
*
* Used by {@link HTable} and {@link HBaseAdmin}
*/
+@SuppressWarnings("serial")
public class HConnectionManager implements HConstants {
private static final Delete [] DELETE_ARRAY_TYPE = new Delete[0];
private static final Put [] PUT_ARRAY_TYPE = new Put[0];
+ public static final String CONNECTION_CLASS_KEY =
+ "hbase.connection.interface.class";
// Register a shutdown hook, one that cleans up RPC and closes zk sessions.
static {
@@ -114,7 +118,20 @@
* @param conf
* @return HConnection object for the instance specified by the configuration
*/
+ @SuppressWarnings("unchecked")
public static HConnection getConnection(HBaseConfiguration conf) {
+ String className = conf.get(CONNECTION_CLASS_KEY, null);
+ if (className != null) {
+ try {
+ Class cls = Class.forName(className);
+ Constructor constructor =
+ cls.getConstructor(new Class [] {HBaseConfiguration.class});
+ return (HConnection)constructor.newInstance(new Object [] {conf});
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ // Else do default.
TableServers connection;
synchronized (HBASE_INSTANCES) {
connection = HBASE_INSTANCES.get(conf);
@@ -125,7 +142,7 @@
}
return connection;
}
-
+
/**
* Delete connection information for the instance specified by configuration
* @param conf
@@ -942,10 +959,7 @@
server = this.servers.get(regionServer.toString());
if (server == null) { // Get a connection
try {
- server = (HRegionInterface)HBaseRPC.waitForProxy(
- serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
- regionServer.getInetSocketAddress(), this.conf,
- this.maxRPCAttempts, this.rpcTimeout);
+ server = createHRegionServer(regionServer);
} catch (RemoteException e) {
throw RemoteExceptionHandler.decodeRemoteException(e);
}
@@ -954,7 +968,20 @@
}
return server;
}
-
+
+ /*
+ * @param regionServer
+ * @return Implementation of HRegionInterface.
+ * @throws IOException
+ */
+ private HRegionInterface createHRegionServer(final HServerAddress regionServer)
+ throws IOException {
+ return (HRegionInterface) HBaseRPC.waitForProxy(
+ this.serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
+ regionServer.getInetSocketAddress(), this.conf, this.maxRPCAttempts,
+ this.rpcTimeout);
+ }
+
public HRegionInterface getHRegionConnection(
HServerAddress regionServer)
throws IOException {