Index: src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 937350) +++ src/test/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -68,6 +68,7 @@ private MiniDFSCluster dfsCluster = null; private MiniHBaseCluster hbaseCluster = null; private MiniMRCluster mrCluster = null; + // If non-null, then already a cluster running. private File clusterTestBuildDir = null; private HBaseAdmin hbaseAdmin = null; @@ -98,6 +99,36 @@ } /** + * Home our cluster in a dir under build/test. Give it a random name + * so can have many concurrent clusters running if we need to. Need to + * amend the test.build.data System property. Its what minidfscluster bases + * it data dir on. Moding a System property is not the way to do concurrent + * instances -- another instance could grab the temporary + * value unintentionally -- but not anything can do about it at moment; its + * how the minidfscluster works. + * @return The calculated cluster test build directory. + */ + File setupClusterTestBuildDir() { + String oldTestBuildDir = + System.getProperty(TEST_DIRECTORY_KEY, "build/test/data"); + String randomStr = UUID.randomUUID().toString(); + String dirStr = oldTestBuildDir + "." + randomStr; + File dir = new File(dirStr).getAbsoluteFile(); + // Have it cleaned up on exit + dir.deleteOnExit(); + return dir; + } + + /** + * @throws IOException If cluster already running. + */ + void isRunningCluster() throws IOException { + if (this.clusterTestBuildDir == null) return; + throw new IOException("Cluster already running at " + + this.clusterTestBuildDir); + } + + /** * @param subdirName * @return Path to a subdirectory named subdirName under * {@link #getTestDir()}. @@ -114,16 +145,35 @@ startMiniCluster(1); } + /** + * Call this if you only want a zk cluster. + * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster. + * @throws Exception + * @see #shutdownMiniZKCluster() + */ public void startMiniZKCluster() throws Exception { - // Note that this is done before we create the MiniHBaseCluster because we - // need to edit the config to add the ZooKeeper servers. + isRunningCluster(); + this.clusterTestBuildDir = setupClusterTestBuildDir(); + startMiniZKCluster(this.clusterTestBuildDir); + + } + + private void startMiniZKCluster(final File dir) throws Exception { this.zkCluster = new MiniZooKeeperCluster(); - int clientPort = this.zkCluster.startup(this.clusterTestBuildDir); + int clientPort = this.zkCluster.startup(dir); this.conf.set("hbase.zookeeper.property.clientPort", Integer.toString(clientPort)); } /** + * @throws IOException + * @see #startMiniZKCluster() + */ + public void shutdownMiniZKCluster() throws IOException { + if (this.zkCluster != null) this.zkCluster.shutdown(); + } + + /** * Start up a minicluster of hbase, optinally dfs, and zookeeper. * Modifies Configuration. Homes the cluster data directory under a random * subdirectory in a directory under System property test.build.data. @@ -138,27 +188,13 @@ throws Exception { LOG.info("Starting up minicluster"); // If we already put up a cluster, fail. - if (this.clusterTestBuildDir != null) { - throw new IOException("Cluster already running at " + - this.clusterTestBuildDir); - } - // Now, home our cluster in a dir under build/test. Give it a random name - // so can have many concurrent clusters running if we need to. Need to - // amend the test.build.data System property. Its what minidfscluster bases - // it data dir on. Moding a System property is not the way to do concurrent - // instances -- another instance could grab the temporary - // value unintentionally -- but not anything can do about it at moment; its - // how the minidfscluster works. - String oldTestBuildDir = + isRunningCluster(); + String oldBuildTestDir = System.getProperty(TEST_DIRECTORY_KEY, "build/test/data"); - String randomStr = UUID.randomUUID().toString(); - String clusterTestBuildDirStr = oldTestBuildDir + "." + randomStr; - this.clusterTestBuildDir = - new File(clusterTestBuildDirStr).getAbsoluteFile(); - // Have it cleaned up on exit - this.clusterTestBuildDir.deleteOnExit(); + this.clusterTestBuildDir = setupClusterTestBuildDir(); + // Set our random dir while minidfscluster is being constructed. - System.setProperty(TEST_DIRECTORY_KEY, clusterTestBuildDirStr); + System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath()); // Bring up mini dfs cluster. This spews a bunch of warnings about missing // scheme. TODO: fix. // Complaints are 'Scheme is undefined for build/test/data/dfs/name1'. @@ -167,7 +203,8 @@ // Restore System property. minidfscluster accesses content of // the TEST_DIRECTORY_KEY to make bad blocks, a feature we are not using, // but otherwise, just in constructor. - System.setProperty(TEST_DIRECTORY_KEY, oldTestBuildDir); + System.setProperty(TEST_DIRECTORY_KEY, oldBuildTestDir); + // Mangle conf so fs parameter points to minidfs we just started up FileSystem fs = this.dfsCluster.getFileSystem(); this.conf.set("fs.defaultFS", fs.getUri().toString()); @@ -175,7 +212,7 @@ // It could be created before the cluster if(this.zkCluster == null) { - startMiniZKCluster(); + startMiniZKCluster(this.clusterTestBuildDir); } // Now do the mini hbase cluster. Set the hbase.rootdir in config. @@ -193,7 +230,7 @@ /** * @throws IOException - * @see {@link #startMiniCluster(boolean, int)} + * @see {@link #startMiniCluster(int)} */ public void shutdownMiniCluster() throws IOException { LOG.info("Shutting down minicluster"); @@ -202,7 +239,7 @@ // Wait till hbase is down before going on to shutdown zk. this.hbaseCluster.join(); } - if (this.zkCluster != null) this.zkCluster.shutdown(); + shutdownMiniZKCluster(); if (this.dfsCluster != null) { // The below throws an exception per dn, AsynchronousCloseException. this.dfsCluster.shutdown(); Index: src/test/org/apache/hadoop/hbase/TestRegionRebalancing.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestRegionRebalancing.java (revision 937350) +++ src/test/org/apache/hadoop/hbase/TestRegionRebalancing.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; /** * Test whether region rebalancing works. (HBASE-71) @@ -195,7 +196,7 @@ private List getOnlineRegionServers() { List list = new ArrayList(); - for (LocalHBaseCluster.RegionServerThread rst : cluster.getRegionThreads()) { + for (JVMClusterUtil.RegionServerThread rst : cluster.getRegionThreads()) { if (rst.getRegionServer().isOnline()) { list.add(rst.getRegionServer()); } Index: src/test/org/apache/hadoop/hbase/TestClusterTransitions.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestClusterTransitions.java (revision 0) +++ src/test/org/apache/hadoop/hbase/TestClusterTransitions.java (revision 0) @@ -0,0 +1,169 @@ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; +import org.apache.hadoop.hbase.ipc.NoNetworkHMasterRegion; +import org.apache.hadoop.hbase.ipc.NoNetworkHServerConnection; +import org.apache.hadoop.hbase.ipc.ServerFactory; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Test cluster transitions. + * Uses nonetwork interfaces. + */ +public class TestClusterTransitions { + private HMaster master; + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private List regionservers = + new ArrayList(); + private static final HRegionInfo [] EMPTY_HRI_ARRAY = new HRegionInfo [] {}; + private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {}; + + + @BeforeClass public static void beforeAllTests() throws Exception { + // This class does nonetwork connections between master and regionservers + // but haven't done the work to make a nonetwork zk yet. + TEST_UTIL.startMiniZKCluster(); + } + + @AfterClass public static void afterAllTests() throws IOException { + TEST_UTIL.shutdownMiniZKCluster(); + } + + + /** + * Starts up a master. + * Fakes it out by sending in startup messages from two *servers*. The two + * fictional regionservers then fake the master into thinking that they have + * deployed .META. and -ROOT- regions. + * @throws IOException + */ + @Before + public void setup() throws IOException { + // Copy config. before I start messing with it + HBaseConfiguration c = new HBaseConfiguration(TEST_UTIL.getConfiguration()); + // Set an alternate to the RPC Server class, one that does not use network. + c.set(ServerFactory.SERVER_CLASS_KEY, "org.apache.hadoop.hbase.ipc.NoNetworkServer"); + // Set an alternate connection class, one that does not use network. + c.set(HConnectionManager.CONNECTION_CLASS_KEY, + "org.apache.hadoop.hbase.ipc.NoNetworkHServerConnection"); + + // Set the hbase.rootdir into config. + FileSystem fs = FileSystem.get(c); + Path hbaseRootdir = fs.makeQualified(fs.getHomeDirectory()); + c.set(HConstants.HBASE_DIR, hbaseRootdir.toString()); + c.setInt(HConstants.MASTER_PORT, 0); + + // Start up master. + this.master = new HMaster(c); + this.master.start(); + + // Get master's connection instance. Add created regionservers to it so it + // can do direct invocations against each instance. + NoNetworkHServerConnection connection = + (NoNetworkHServerConnection)this.master.getServerConnection(); + + for (int i = 0; i < 2; i++) { + this.regionservers.add(createNoNetworkRegionServer(c, this.master, connection, i)); + } + // Master already started so pass null for it. + JVMClusterUtil.startup(null, this.regionservers); + } + + /* + * @param c + * @param m + * @param connection + * @param port + * @return A RegionServerThread with an instance of NoNetworkRegionServer. + * @throws IOException + */ + private JVMClusterUtil.RegionServerThread createNoNetworkRegionServer(final HBaseConfiguration c, + final HMaster m, final NoNetworkHServerConnection connection, final int port) + throws IOException { + HRegionServer hrs = new NoNetworkRegionServer(c, m, port); + JVMClusterUtil.RegionServerThread rst = + new JVMClusterUtil.RegionServerThread(hrs, port); + connection.add(hrs); + return rst; + } + + /** + * Subclass so can set no networking implementation of {@link HMasterRegionInterface}. + */ + static class NoNetworkRegionServer extends HRegionServer { + private final HMaster master; + + NoNetworkRegionServer(final HBaseConfiguration c, final HMaster m, + final int port) + throws IOException { + super(setNoNetworkingHMasterRegion(c, port)); + this.master = m; + } + + /* + * @param c + * @param port + * @return A copy of c with HRegionServer.MASTER_CLASS_KEY and + * port set. + */ + private static HBaseConfiguration setNoNetworkingHMasterRegion(final HBaseConfiguration c, + final int port) { + HBaseConfiguration hbc = new HBaseConfiguration(c); + hbc.set(HRegionServer.MASTER_CLASS_KEY, + "org.apache.hadoop.hbase.ipc.NoNetworkHMasterRegion"); + hbc.setInt(HRegionServer.REGIONSERVER_PORT, port); + return hbc; + } + + @Override + public HMasterRegionInterface setupMasterInterface() { + HMasterRegionInterface i = super.setupMasterInterface(); + ((NoNetworkHMasterRegion)i).setMaster(this.master); + return i; + } + } + + @After + public void teardown() { + JVMClusterUtil.shutdown(this.master, regionservers); + } + + /** + * @see HBASE-2428 + */ + @Test public void testRegionCloseWhenNoMeta() throws Exception { + /* + // Push some random regions on to the master to assign. + HTableDescriptor htd = new HTableDescriptor("test_table"); + for(byte[] family: new byte [][] {Bytes.toBytes("one"), Bytes.toBytes("two"), + Bytes.toBytes("three")}) { + htd.addFamily(new HColumnDescriptor(family)); + } + // Add three regions to assign. + HRegionInfo hri = new HRegionInfo(htd, HConstants.EMPTY_START_ROW, + Bytes.toBytes("01")); + */ + int x = 0; + /* + this.master.setUnassigned(hri); + hri = new HRegionInfo(htd, Bytes.toBytes("01"), Bytes.toBytes("02")); + this.master.setUnassigned(hri); + hri = new HRegionInfo(htd, Bytes.toBytes("02"), HConstants.EMPTY_END_ROW); + this.master.setUnassigned(hri); + */ + } +} Index: src/test/org/apache/hadoop/hbase/regionserver/DisabledTestRegionServerExit.java =================================================================== --- src/test/org/apache/hadoop/hbase/regionserver/DisabledTestRegionServerExit.java (revision 937350) +++ src/test/org/apache/hadoop/hbase/regionserver/DisabledTestRegionServerExit.java (working copy) @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; /** * Tests region server failover when a region server exits both cleanly and @@ -125,7 +126,7 @@ * is just shut down. */ private void stopOrAbortMetaRegionServer(boolean abort) { - List regionThreads = + List regionThreads = cluster.getRegionThreads(); int server = -1; Index: src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 937350) +++ src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.util.JVMClusterUtil; /** * This class creates a single process HBase cluster. One thread is created for @@ -83,7 +84,7 @@ * @return Name of regionserver started. */ public String startRegionServer() throws IOException { - LocalHBaseCluster.RegionServerThread t = + JVMClusterUtil.RegionServerThread t = this.hbaseCluster.addRegionServer(); t.start(); t.waitForServerOnline(); @@ -126,7 +127,7 @@ * @param serverNumber Used as index into a list. * @return the region server that was stopped */ - public LocalHBaseCluster.RegionServerThread stopRegionServer(int serverNumber) { + public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) { return stopRegionServer(serverNumber, true); } @@ -140,9 +141,9 @@ * before end of the test. * @return the region server that was stopped */ - public LocalHBaseCluster.RegionServerThread stopRegionServer(int serverNumber, + public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber, final boolean shutdownFS) { - LocalHBaseCluster.RegionServerThread server = + JVMClusterUtil.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber); LOG.info("Stopping " + server.toString()); if (!shutdownFS) { @@ -185,7 +186,7 @@ * @throws IOException */ public void flushcache() throws IOException { - for (LocalHBaseCluster.RegionServerThread t: + for (JVMClusterUtil.RegionServerThread t: this.hbaseCluster.getRegionServers()) { for(HRegion r: t.getRegionServer().getOnlineRegions()) { r.flushcache(); @@ -196,7 +197,7 @@ /** * @return List of region server threads. */ - public List getRegionThreads() { + public List getRegionThreads() { return this.hbaseCluster.getRegionServers(); } Index: src/test/org/apache/hadoop/hbase/ipc/NoNetworkHServerConnection.java =================================================================== --- src/test/org/apache/hadoop/hbase/ipc/NoNetworkHServerConnection.java (revision 0) +++ src/test/org/apache/hadoop/hbase/ipc/NoNetworkHServerConnection.java (revision 0) @@ -0,0 +1,202 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.client.ServerConnection; +import org.apache.hadoop.hbase.ipc.HMasterInterface; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; + +/** + * Implementation of {@link ServerConnection} that does not go via the + * network; i.e. no RPC'ing. + */ +public class NoNetworkHServerConnection implements ServerConnection { + private final Map regionservers = + new HashMap(); + + public NoNetworkHServerConnection(final HBaseConfiguration c) { + // Nothing to do. + } + + @Override + public void clearRegionCache() { + // TODO Auto-generated method stub + } + + @Override + public HRegionInterface getHRegionConnection(HServerAddress regionServer) + throws IOException { + return getHRegionConnection(regionServer, false); + } + + public void add(final HRegionServer rs) { + this.regionservers.put(rs.getAddress(), rs); + } + + @Override + public HRegionInterface getHRegionConnection(HServerAddress regionServer, + boolean getMaster) + throws IOException { + return this.regionservers.get(regionServer); + } + + @Override + public HTableDescriptor getHTableDescriptor(byte[] tableName) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public HMasterInterface getMaster() throws MasterNotRunningException { + // TODO Auto-generated method stub + return null; + } + + @Override + public HRegionLocation getRegionLocation(byte[] tableName, byte[] row, + boolean reload) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public T getRegionServerForWithoutRetries(ServerCallable callable) + throws IOException, RuntimeException { + // TODO Auto-generated method stub + return null; + } + + @Override + public T getRegionServerWithRetries(ServerCallable callable) + throws IOException, RuntimeException { + // TODO Auto-generated method stub + return null; + } + + @Override + public ZooKeeperWrapper getZooKeeperWrapper() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean isMasterRunning() { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isTableAvailable(byte[] tableName) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isTableDisabled(byte[] tableName) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isTableEnabled(byte[] tableName) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public HTableDescriptor[] listTables() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public HRegionLocation locateRegion(byte[] tableName, byte[] row) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public int processBatchOfDeletes(List list, byte[] tableName) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void processBatchOfPuts(List list, byte[] tableName, + ExecutorService pool) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public int processBatchOfRows(ArrayList list, byte[] tableName) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public HRegionLocation relocateRegion(byte[] tableName, byte[] row) + throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean tableExists(byte[] tableName) throws MasterNotRunningException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void setRootRegionLocation(HRegionLocation rootRegion) { + // TODO Auto-generated method stub + + } + + @Override + public void unsetRootRegionLocation() { + // TODO Auto-generated method stub + + } +} \ No newline at end of file Index: src/test/org/apache/hadoop/hbase/ipc/NoNetworkServer.java =================================================================== --- src/test/org/apache/hadoop/hbase/ipc/NoNetworkServer.java (revision 0) +++ src/test/org/apache/hadoop/hbase/ipc/NoNetworkServer.java (revision 0) @@ -0,0 +1,92 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.io.Writable; + +/** + * Implementation of {@link ServerInterface} that does not go via the + * network; i.e. no RPC'ing. + */ +public class NoNetworkServer implements ServerInterface { + private final InetSocketAddress address; + + public NoNetworkServer(final HBaseConfiguration c, final HServerAddress a) { + this.address = new InetSocketAddress(a.getHostname(), a.getPort()); + } + + @Override + public Writable call(Writable param, long receiveTime) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getCallQueueLen() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public InetSocketAddress getListenerAddress() { + return this.address; + } + + @Override + public int getNumOpenConnections() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void join() throws InterruptedException { + // TODO Auto-generated method stub + + } + + @Override + public void setErrorHandler(HBaseRPCErrorHandler handler) { + // TODO Auto-generated method stub + + } + + @Override + public void setSocketSendBufSize(int size) { + // TODO Auto-generated method stub + + } + + @Override + public void start() { + // TODO Auto-generated method stub + + } + + @Override + public void stop() { + // TODO Auto-generated method stub + + } +} Index: src/test/org/apache/hadoop/hbase/ipc/NoNetworkHMasterRegion.java =================================================================== --- src/test/org/apache/hadoop/hbase/ipc/NoNetworkHMasterRegion.java (revision 0) +++ src/test/org/apache/hadoop/hbase/ipc/NoNetworkHMasterRegion.java (revision 0) @@ -0,0 +1,74 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HMsg; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.io.MapWritable; + +/** + * Implementation of {@link HMasterRegionInterface} that does not go via the + * network; i.e. no RPC'ing but invokes methods on master directly. You must + * call {@link #setMaster(HMaster)}, just after {@link HRegionServer#setupMasterInterface()}, + * so this method has a Master to pass on the invocations to. + */ +public class NoNetworkHMasterRegion implements HMasterRegionInterface { + private HMaster master; + + public NoNetworkHMasterRegion(final HBaseConfiguration c, + final HServerAddress a) { + // Nothing to do w/ params. + } + + /** + * Call just after {@link HRegionServer#setupMasterInterface()} else NPEs + * because this instance has not {@link HMaster} to work with. + * @param m + */ + public void setMaster(final HMaster m) { + this.master = m; + } + + @Override + public HMsg[] regionServerReport(HServerInfo info, HMsg[] msgs, + HRegionInfo[] mostLoadedRegions) + throws IOException { + return this.master.regionServerReport(info, msgs, mostLoadedRegions); + } + + @Override + public MapWritable regionServerStartup(HServerInfo info) throws IOException { + return this.master.regionServerStartup(info); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) + throws IOException { + return this.master.getProtocolVersion(protocol, clientVersion); + } +} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 937350) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -89,9 +89,10 @@ import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; -import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.ipc.ServerFactory; +import org.apache.hadoop.hbase.ipc.ServerInterface; import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -120,6 +121,8 @@ private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED); private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {}; + public static final String MASTER_CLASS_KEY = "hbase.master.class"; + // Set when a report to the master comes back with a message asking us to // shutdown. Also set by call to stop when debugging or running unit tests // of HRegionServer in isolation. We use AtomicBoolean rather than @@ -171,7 +174,7 @@ // Server to handle client requests. Default access so can be accessed by // unit tests. - HBaseServer server; + ServerInterface server; // Leases private Leases leases; @@ -293,9 +296,7 @@ this.shutdownHDFS.set(true); // Server to handle client requests - this.server = HBaseRPC.getServer(this, address.getBindAddress(), - address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), - false, conf); + this.server = ServerFactory.getServer(this, this.conf, this.address); this.server.setErrorHandler(this); // Address is giving a default IP for the moment. Will be changed after // calling the master. @@ -375,7 +376,7 @@ } else if (type == EventType.NodeDeleted) { watchMasterAddress(); } else if (type == EventType.NodeCreated) { - getMaster(); + this.hbaseMaster = setupMasterInterface(); // ZooKeeper watches are one time only, so we need to re-register our watch. watchMasterAddress(); @@ -764,7 +765,13 @@ // Master may have sent us a new address with the other configs. // Update our address in this case. See HBASE-719 String hra = conf.get("hbase.regionserver.address"); - if (address != null) { + // TODO: The below used to be this.address != null. Was broken by what + // looks like a mistake in: + // + // HBASE-1215 migration; metautils scan of meta region was broken; wouldn't see first row + // ------------------------------------------------------------------------ + // r796326 | stack | 2009-07-21 07:40:34 -0700 (Tue, 21 Jul 2009) | 38 lines + if (hra != null) { HServerAddress hsa = new HServerAddress (hra, this.serverInfo.getServerAddress().getPort()); LOG.info("Master passed us address to use. Was=" + @@ -1276,11 +1283,16 @@ Threads.shutdown(this.hlogRoller); } - private boolean getMaster() { + /** + * Override to intercept implementation of {@link HMasterRegionInterface}. + * @return The instance of {@link HMasterRegionInterface} this {@link HRegionServer} + * will use. + */ + public HMasterRegionInterface setupMasterInterface() { HServerAddress masterAddress = null; while (masterAddress == null) { if (stopRequested.get()) { - return false; + return null; } try { masterAddress = zooKeeperWrapper.readMasterAddressOrThrow(); @@ -1297,25 +1309,60 @@ try { // Do initial RPC setup. The final argument indicates that the RPC // should retry indefinitely. - master = (HMasterRegionInterface)HBaseRPC.waitForProxy( - HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID, - masterAddress.getInetSocketAddress(), - this.conf, -1, this.rpcTimeout); + master = getMaster(this.conf, masterAddress, this.rpcTimeout); } catch (IOException e) { LOG.warn("Unable to connect to master. Retrying. Error was:", e); sleeper.sleep(); } } - this.hbaseMaster = master; - return true; + return master; } /* + * By default returns an instance of RPC proxy on {@link HMasterRegionInterface}. + * To customize, pass the name of an implementation of + * {@link HMasterRegionInterface} that has a constructor that takes + * {@link HBaseConfiguration} and {@link HServerAddress}. + * @param c + * @param a + * @param rpcTimeout + * @return Instance of {@link HMasterRegionInterface}. + * @throws IOException + */ + @SuppressWarnings("unchecked") + private HMasterRegionInterface getMaster(final HBaseConfiguration c, + final HServerAddress a, final long rpcTimeout) + throws IOException { + String masterClass = c.get(MASTER_CLASS_KEY, null); + // If an alternate class supplied, make an instance. Presume that it + // has a constructor that takes the Configuration and an HServerAddress. + if (masterClass != null) { + try { + Class cls = Class.forName(masterClass); + Constructor constructor = + cls.getConstructor(new Class [] {HBaseConfiguration.class, HServerAddress.class}); + return (HMasterRegionInterface)constructor.newInstance(new Object [] {c, a}); + } catch (Exception e) { + if (e instanceof IOException) throw (IOException)e; + else { + IOException ioe = new IOException(); + ioe.initCause(e); + } + } + } + return (HMasterRegionInterface)HBaseRPC.waitForProxy( + HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID, + a.getInetSocketAddress(), c, -1, rpcTimeout); + } + + /* * Let the master know we're here * Run initialization using parameters passed us by the master. */ private MapWritable reportForDuty() { - while (!stopRequested.get() && !getMaster()) { + while (!stopRequested.get()) { + this.hbaseMaster = setupMasterInterface(); + if (this.hbaseMaster != null) break; sleeper.sleep(); LOG.warn("Unable to get master for initialization"); } @@ -2399,10 +2446,90 @@ return fs; } + /** + * @return The address of this server. + */ + public HServerAddress getAddress() { return this.address; } + + /** {@inheritDoc} */ + public long incrementColumnValue(byte [] regionName, byte [] row, + byte [] family, byte [] qualifier, long amount, boolean writeToWAL) + throws IOException { + checkOpen(); + + if (regionName == null) { + throw new IOException("Invalid arguments to incrementColumnValue " + + "regionName is null"); + } + requestCount.incrementAndGet(); + try { + HRegion region = getRegion(regionName); + long retval = region.incrementColumnValue(row, family, qualifier, amount, + writeToWAL); + + return retval; + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + /** {@inheritDoc} */ + public HRegionInfo[] getRegionsAssignment() throws IOException { + HRegionInfo[] regions = new HRegionInfo[onlineRegions.size()]; + Iterator ite = onlineRegions.values().iterator(); + for(int i = 0; ite.hasNext(); i++) { + regions[i] = ite.next().getRegionInfo(); + } + return regions; + } + + /** {@inheritDoc} */ + public HServerInfo getHServerInfo() throws IOException { + return serverInfo; + } + + @Override + public MultiPutResponse multiPut(MultiPut puts) throws IOException { + MultiPutResponse resp = new MultiPutResponse(); + + // do each region as it's own. + for( Map.Entry> e: puts.puts.entrySet()) { + int result = put(e.getKey(), e.getValue().toArray(new Put[]{})); + resp.addResult(e.getKey(), result); + + e.getValue().clear(); // clear some RAM + } + + return resp; + } + // // Main program and support routines // + + /** + * @param hrs + * @return Thread the RegionServer is running in correctly named. + */ + public static Thread startRegionServer(final HRegionServer hrs) { + return startRegionServer(hrs, + "regionserver" + hrs.server.getListenerAddress()); + } + /** + * @param hrs + * @param name + * @return Thread the RegionServer is running in correctly named. + */ + public static Thread startRegionServer(final HRegionServer hrs, + final String name) { + Thread t = new Thread(hrs); + t.setName(name); + t.start(); + return t; + } + private static void printUsageAndExit() { printUsageAndExit(null); } @@ -2414,7 +2541,7 @@ System.err.println("Usage: java org.apache.hbase.HRegionServer start|stop"); System.exit(0); } - + /** * Do class main. * @param args @@ -2444,10 +2571,7 @@ } Constructor c = regionServerClass.getConstructor(HBaseConfiguration.class); - HRegionServer hrs = c.newInstance(conf); - Thread t = new Thread(hrs); - t.setName("regionserver" + hrs.server.getListenerAddress()); - t.start(); + startRegionServer(c.newInstance(conf)); } } catch (Throwable t) { LOG.error( "Can not start region server because "+ @@ -2466,45 +2590,7 @@ printUsageAndExit(); } } - - /** {@inheritDoc} */ - public long incrementColumnValue(byte [] regionName, byte [] row, - byte [] family, byte [] qualifier, long amount, boolean writeToWAL) - throws IOException { - checkOpen(); - if (regionName == null) { - throw new IOException("Invalid arguments to incrementColumnValue " + - "regionName is null"); - } - requestCount.incrementAndGet(); - try { - HRegion region = getRegion(regionName); - long retval = region.incrementColumnValue(row, family, qualifier, amount, - writeToWAL); - - return retval; - } catch (IOException e) { - checkFileSystem(); - throw e; - } - } - - /** {@inheritDoc} */ - public HRegionInfo[] getRegionsAssignment() throws IOException { - HRegionInfo[] regions = new HRegionInfo[onlineRegions.size()]; - Iterator ite = onlineRegions.values().iterator(); - for(int i = 0; ite.hasNext(); i++) { - regions[i] = ite.next().getRegionInfo(); - } - return regions; - } - - /** {@inheritDoc} */ - public HServerInfo getHServerInfo() throws IOException { - return serverInfo; - } - /** * @param args */ @@ -2516,20 +2602,4 @@ HRegionServer.class); doMain(args, regionServerClass); } - - - @Override - public MultiPutResponse multiPut(MultiPut puts) throws IOException { - MultiPutResponse resp = new MultiPutResponse(); - - // do each region as it's own. - for( Map.Entry> e: puts.puts.entrySet()) { - int result = put(e.getKey(), e.getValue().toArray(new Put[]{})); - resp.addResult(e.getKey(), result); - - e.getValue().clear(); // clear some RAM - } - - return resp; - } } Index: src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java =================================================================== --- src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (revision 937350) +++ src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (working copy) @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; /** @@ -59,7 +61,7 @@ public class LocalHBaseCluster implements HConstants { static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class); private final HMaster master; - private final List regionThreads; + private final List regionThreads; private final static int DEFAULT_NO = 1; /** local mode */ public static final String LOCAL = "local"; @@ -96,77 +98,33 @@ // port '0' so there won't be clashes over default port as unit tests // start/stop ports at different times during the life of the test. conf.set(REGIONSERVER_PORT, "0"); - this.regionThreads = new ArrayList(); + this.regionThreads = + new CopyOnWriteArrayList(); regionServerClass = (Class) conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); for (int i = 0; i < noRegionServers; i++) { - addRegionServer(); + addRegionServer(i); } } - /** - * Creates a region server. - * Call 'start' on the returned thread to make it run. - * - * @throws IOException - * @return Region server added. - */ - public RegionServerThread addRegionServer() throws IOException { - synchronized (regionThreads) { - HRegionServer server; - try { - server = regionServerClass.getConstructor(HBaseConfiguration.class). - newInstance(conf); - } catch (Exception e) { - IOException ioe = new IOException(); - ioe.initCause(e); - throw ioe; - } - RegionServerThread t = new RegionServerThread(server, - this.regionThreads.size()); - this.regionThreads.add(t); - return t; - } + public JVMClusterUtil.RegionServerThread addRegionServer() throws IOException { + return addRegionServer(this.regionThreads.size()); } + public JVMClusterUtil.RegionServerThread addRegionServer(final int index) throws IOException { + JVMClusterUtil.RegionServerThread rst = JVMClusterUtil.createRegionServerThread(this.conf, + this.regionServerClass, index); + this.regionThreads.add(rst); + return rst; + } + /** * @param serverNumber * @return region server */ public HRegionServer getRegionServer(int serverNumber) { - synchronized (regionThreads) { - return regionThreads.get(serverNumber).getRegionServer(); - } + return regionThreads.get(serverNumber).getRegionServer(); } - /** runs region servers */ - public static class RegionServerThread extends Thread { - private final HRegionServer regionServer; - - RegionServerThread(final HRegionServer r, final int index) { - super(r, "RegionServer:" + index); - this.regionServer = r; - } - - /** @return the region server */ - public HRegionServer getRegionServer() { - return this.regionServer; - } - - /** - * Block until the region server has come online, indicating it is ready - * to be used. - */ - public void waitForServerOnline() { - while (!regionServer.isOnline()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // continue waiting - } - } - } - } - /** * @return the HMaster thread */ @@ -177,7 +135,7 @@ /** * @return Read-only list of region server threads. */ - public List getRegionServers() { + public List getRegionServers() { return Collections.unmodifiableList(this.regionThreads); } @@ -188,10 +146,8 @@ * @return Name of region server that just went down. */ public String waitOnRegionServer(int serverNumber) { - RegionServerThread regionServerThread; - synchronized (regionThreads) { - regionServerThread = this.regionThreads.remove(serverNumber); - } + JVMClusterUtil.RegionServerThread regionServerThread = + this.regionThreads.remove(serverNumber); while (regionServerThread.isAlive()) { try { LOG.info("Waiting on " + @@ -212,14 +168,12 @@ */ public void join() { if (this.regionThreads != null) { - synchronized(this.regionThreads) { for(Thread t: this.regionThreads) { if (t.isAlive()) { try { t.join(); - } catch (InterruptedException e) { - // continue - } + } catch (InterruptedException e) { + // continue } } } @@ -232,92 +186,22 @@ } } } - + /** - * Start the cluster. - * @return Address to use contacting master. + * Shutdown the mini HBase cluster */ - public String startup() { - this.master.start(); - synchronized (regionThreads) { - for (RegionServerThread t: this.regionThreads) { - t.start(); - } - } - return this.master.getMasterAddress().toString(); + public void startup() { + JVMClusterUtil.startup(this.master, this.regionThreads); } /** - * Shut down the mini HBase cluster + * Shutdown the mini HBase cluster */ public void shutdown() { - LOG.debug("Shutting down HBase Cluster"); - // Be careful how the hdfs shutdown thread runs in context where more than - // one regionserver in the mix. - Thread shutdownThread = null; - synchronized (this.regionThreads) { - for (RegionServerThread t: this.regionThreads) { - Thread tt = t.getRegionServer().setHDFSShutdownThreadOnExit(null); - if (shutdownThread == null && tt != null) { - shutdownThread = tt; - } - } - } - if(this.master != null) { - this.master.shutdown(); - } - // regionServerThreads can never be null because they are initialized when - // the class is constructed. - synchronized(this.regionThreads) { - for(Thread t: this.regionThreads) { - if (t.isAlive()) { - try { - t.join(); - } catch (InterruptedException e) { - // continue - } - } - } - } - if (this.master != null) { - while (this.master.isAlive()) { - try { - // The below has been replaced to debug sometime hangs on end of - // tests. - // this.master.join(): - threadDumpingJoin(this.master); - } catch(InterruptedException e) { - // continue - } - } - } - Threads.shutdown(shutdownThread); - LOG.info("Shutdown " + - ((this.regionThreads != null)? this.master.getName(): "0 masters") + - " " + this.regionThreads.size() + " region server(s)"); + JVMClusterUtil.shutdown(this.master, this.regionThreads); } /** - * @param t - * @throws InterruptedException - */ - public void threadDumpingJoin(final Thread t) throws InterruptedException { - if (t == null) { - return; - } - long startTime = System.currentTimeMillis(); - while (t.isAlive()) { - Thread.sleep(1000); - if (System.currentTimeMillis() - startTime > 60000) { - startTime = System.currentTimeMillis(); - ReflectionUtils.printThreadInfo(new PrintWriter(System.out), - "Automatic Stack Trace every 60 seconds waiting on " + - t.getName()); - } - } - } - - /** * @param c Configuration to check. * @return True if a 'local' address in hbase.master value. */ @@ -341,4 +225,4 @@ admin.createTable(htd); cluster.shutdown(); } -} +} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/java/org/apache/hadoop/hbase/master/HMaster.java (revision 937350) +++ src/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -19,8 +19,8 @@ */ package org.apache.hadoop.hbase.master; +import java.io.File; import java.io.IOException; -import java.io.File; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.lang.reflect.Constructor; @@ -33,8 +33,8 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -56,9 +56,9 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.LocalHBaseCluster; import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.MiniZooKeeperCluster; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.MiniZooKeeperCluster; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Result; @@ -66,12 +66,13 @@ import org.apache.hadoop.hbase.client.ServerConnection; import org.apache.hadoop.hbase.client.ServerConnectionManager; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.ipc.ServerFactory; +import org.apache.hadoop.hbase.ipc.ServerInterface; import org.apache.hadoop.hbase.master.metrics.MasterMetrics; import org.apache.hadoop.hbase.regionserver.HLog; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -80,8 +81,8 @@ import org.apache.hadoop.hbase.util.InfoServer; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Sleeper; +import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; @@ -134,7 +135,7 @@ volatile BlockingQueue toDoQueue = new PriorityBlockingQueue(); - private final HBaseServer server; + private final ServerInterface server; private final HServerAddress address; final ServerConnection connection; @@ -174,8 +175,8 @@ conf.get("hbase.master.dns.nameserver","default")); addressStr += ":" + conf.get(MASTER_PORT, Integer.toString(DEFAULT_MASTER_PORT)); - HServerAddress address = new HServerAddress(addressStr); - LOG.info("My address is " + address); + HServerAddress hsa = new HServerAddress(addressStr); + LOG.info("My address is " + hsa); this.conf = conf; this.rootdir = new Path(conf.get(HBASE_DIR)); @@ -220,9 +221,7 @@ this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 120 * 1000); this.leaseTimeout = conf.getInt("hbase.master.lease.period", 120 * 1000); - this.server = HBaseRPC.getServer(this, address.getBindAddress(), - address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), - false, conf); + this.server = ServerFactory.getServer(this, this.conf, hsa); // The rpc-server port can be ephemeral... ensure we have the correct info this.address = new HServerAddress(server.getListenerAddress()); @@ -376,6 +375,14 @@ } /** + * Used testing. + * @param hri Region to set unassigned. + */ + void setUnassigned(final HRegionInfo hri) { + this.regionManager.setUnassigned(hri, false); + } + + /** * @return Location of the -ROOT- region. */ public HServerAddress getRootRegionLocation() { @@ -407,6 +414,13 @@ return this.serverManager; } + /** + * @return ServerConnection used by this master. + */ + public ServerConnection getServerConnection() { + return this.connection; + } + /** Main processing loop */ @Override public void run() { Index: src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (revision 937350) +++ src/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (working copy) @@ -380,8 +380,8 @@ * Out is not synchronized because only the first thread does this. */ private void writeHeader() throws IOException { - out.write(HBaseServer.HEADER.array()); - out.write(HBaseServer.CURRENT_VERSION); + out.write(ServerInterface.HEADER.array()); + out.write(ServerInterface.CURRENT_VERSION); //When there are more fields we can have ConnectionHeader Writable. DataOutputBuffer buf = new DataOutputBuffer(); ObjectWritable.writeObject(buf, remoteId.getTicket(), Index: src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (revision 937350) +++ src/java/org/apache/hadoop/hbase/ipc/HBaseRPC.java (working copy) @@ -29,10 +29,6 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; import java.util.HashMap; import java.util.Map; Index: src/java/org/apache/hadoop/hbase/ipc/ServerInterface.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/ServerInterface.java (revision 0) +++ src/java/org/apache/hadoop/hbase/ipc/ServerInterface.java (revision 0) @@ -0,0 +1,81 @@ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import org.apache.hadoop.io.Writable; + +/** + * Server. + * TODO: Don't need this many of the {@link HBaseServer} methods. Cut them down. + */ +public interface ServerInterface { + /** + * The first four bytes of Hadoop RPC connections + */ + public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); + // 1 : Introduce ping and server does not throw away RPCs + // 3 : RPC was refactored in 0.19 + public static final byte CURRENT_VERSION = 3; + + /** + * Sets the socket buffer size used for responding to RPCs. + * + * @param size + */ + public abstract void setSocketSendBufSize(int size); + + /** Starts the service. Must be called before any calls will be handled. */ + public abstract void start(); + + /** Stops the service. No new calls will be handled after this is called. */ + public abstract void stop(); + + /** + * Wait for the server to be stopped. Does not wait for all subthreads to + * finish. See {@link #stop()}. + * + * @throws InterruptedException + */ + public abstract void join() throws InterruptedException; + + /** + * Return the socket (ip+port) on which the RPC server is listening to. + * + * @return the socket (ip+port) on which the RPC server is listening to. + */ + public abstract InetSocketAddress getListenerAddress(); + + /** + * Called for each call. + * + * @param param + * @param receiveTime + * @return Writable + * @throws IOException + */ + public abstract Writable call(Writable param, long receiveTime) + throws IOException; + + /** + * The number of open RPC conections + * + * @return the number of open rpc connections + */ + public abstract int getNumOpenConnections(); + + /** + * The number of rpc calls in the queue. + * + * @return The number of rpc calls in the queue. + */ + public abstract int getCallQueueLen(); + + /** + * Set the handler for calling out of RPC for error conditions. + * + * @param handler + * the handler implementation + */ + public abstract void setErrorHandler(HBaseRPCErrorHandler handler); +} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (revision 937350) +++ src/java/org/apache/hadoop/hbase/ipc/HBaseServer.java (working copy) @@ -67,25 +67,15 @@ * * @see HBaseClient */ -public abstract class HBaseServer { +public abstract class HBaseServer implements ServerInterface { + public static final Log LOG = + LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer"); /** - * The first four bytes of Hadoop RPC connections - */ - public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); - - // 1 : Introduce ping and server does not throw away RPCs - // 3 : RPC was refactored in 0.19 - public static final byte CURRENT_VERSION = 3; - - /** * How many calls/handler are allowed in the queue. */ private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100; - public static final Log LOG = - LogFactory.getLog("org.apache.hadoop.ipc.HBaseServer"); - protected static final ThreadLocal SERVER = new ThreadLocal(); @@ -95,7 +85,7 @@ * the server context. * @return HBaseServer */ - public static HBaseServer get() { + public static ServerInterface get() { return SERVER.get(); } @@ -1014,12 +1004,14 @@ connection.close(); } - /** Sets the socket buffer size used for responding to RPCs. - * @param size - */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#setSocketSendBufSize(int) + */ public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; } - /** Starts the service. Must be called before any calls will be handled. */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#start() + */ public synchronized void start() { responder.start(); listener.start(); @@ -1031,7 +1023,9 @@ } } - /** Stops the service. No new calls will be handled after this is called. */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#stop() + */ public synchronized void stop() { LOG.info("Stopping server on " + port); running = false; @@ -1051,54 +1045,45 @@ } } - /** Wait for the server to be stopped. - * Does not wait for all subthreads to finish. - * See {@link #stop()}. - * @throws InterruptedException - */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#join() + */ public synchronized void join() throws InterruptedException { while (running) { wait(); } } - /** - * Return the socket (ip+port) on which the RPC server is listening to. - * @return the socket (ip+port) on which the RPC server is listening to. - */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#getListenerAddress() + */ public synchronized InetSocketAddress getListenerAddress() { return listener.getAddress(); } - /** Called for each call. - * @param param - * @param receiveTime - * @return Writable - * @throws IOException - */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#call(org.apache.hadoop.io.Writable, long) + */ public abstract Writable call(Writable param, long receiveTime) throws IOException; - /** - * The number of open RPC conections - * @return the number of open rpc connections - */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#getNumOpenConnections() + */ public int getNumOpenConnections() { return numConnections; } - /** - * The number of rpc calls in the queue. - * @return The number of rpc calls in the queue. - */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#getCallQueueLen() + */ public int getCallQueueLen() { return callQueue.size(); } - /** - * Set the handler for calling out of RPC for error conditions. - * @param handler the handler implementation - */ + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.ipc.ServerInterface#setErrorHandler(org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler) + */ public void setErrorHandler(HBaseRPCErrorHandler handler) { this.errorHandler = handler; } Index: src/java/org/apache/hadoop/hbase/ipc/ServerFactory.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/ServerFactory.java (revision 0) +++ src/java/org/apache/hadoop/hbase/ipc/ServerFactory.java (revision 0) @@ -0,0 +1,53 @@ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.lang.reflect.Constructor; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HServerAddress; + +/** + * A Server Factory. + * If hbase.server.class is set in the passed configuration, + * its presumed its an implementation of {@link ServerInterface} and that it + * has a constructor that takes an HBaseConfiguration and a HServerAdddress. + * This factory will instantiate an instance of this configured class. + * Otherwise it returns result of + * {@link HBaseRPC#getServer(Object, String, int, int, boolean, org.apache.hadoop.conf.Configuration)} + */ +public class ServerFactory { + public static final String SERVER_CLASS_KEY = "hbase.server.interface.class"; + + /** + * Return a {@link ServerInterface} implementation. + * @param instance + * @param c Configuration + * @param a Address + * @return An ServerInterface implementation. + * @throws IOException + */ + @SuppressWarnings("unchecked") + public static ServerInterface getServer(final Object instance, + final HBaseConfiguration c, final HServerAddress a) + throws IOException { + String serverClass = c.get(SERVER_CLASS_KEY, null); + // If an alternate class supplied, make an instance. Presume that it + // has a constructor that takes the Configuration and an HServerAddress. + if (serverClass != null) { + try { + Class cls = Class.forName(serverClass); + Constructor constructor = + cls.getConstructor(new Class [] {HBaseConfiguration.class, HServerAddress.class}); + return (ServerInterface)constructor.newInstance(new Object [] {c, a}); + } catch (Exception e) { + if (e instanceof IOException) throw (IOException)e; + else { + IOException ioe = new IOException(); + ioe.initCause(e); + } + } + } + return HBaseRPC.getServer(instance, a.getBindAddress(), a.getPort(), + c.getInt("hbase.regionserver.handler.count", 25), false, c); + } +} Index: src/java/org/apache/hadoop/hbase/util/Threads.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/Threads.java (revision 937350) +++ src/java/org/apache/hadoop/hbase/util/Threads.java (working copy) @@ -17,10 +17,12 @@ */ package org.apache.hadoop.hbase.util; +import java.io.PrintWriter; import java.lang.Thread.UncaughtExceptionHandler; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.ReflectionUtils; /** * Thread Utility @@ -72,6 +74,7 @@ * @param t Thread to shutdown */ public static void shutdown(final Thread t, final long joinwait) { + if (t == null) return; while (t.isAlive()) { try { t.join(joinwait); @@ -80,4 +83,27 @@ } } } + + + /** + * @param t Waits on the passed thread to die dumping a threaddump every + * minute while its up. + * @throws InterruptedException + */ + public static void threadDumpingIsAlive(final Thread t) + throws InterruptedException { + if (t == null) { + return; + } + long startTime = System.currentTimeMillis(); + while (t.isAlive()) { + Thread.sleep(1000); + if (System.currentTimeMillis() - startTime > 60000) { + startTime = System.currentTimeMillis(); + ReflectionUtils.printThreadInfo(new PrintWriter(System.out), + "Automatic Stack Trace every 60 seconds waiting on " + + t.getName()); + } + } + } } \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (revision 0) +++ src/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (revision 0) @@ -0,0 +1,155 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegionServer; + +/** + * Utility used running a cluster all in the one JVM. + */ +public class JVMClusterUtil { + private static final Log LOG = LogFactory.getLog(JVMClusterUtil.class); + + /** + * Datastructure to hold RegionServer Thread and RegionServer instance + */ + public static class RegionServerThread extends Thread { + private final HRegionServer regionServer; + + public RegionServerThread(final HRegionServer r, final int index) { + super(r, "RegionServer:" + index); + this.regionServer = r; + } + + /** @return the region server */ + public HRegionServer getRegionServer() { + return this.regionServer; + } + + /** + * Block until the region server has come online, indicating it is ready + * to be used. + */ + public void waitForServerOnline() { + while (!regionServer.isOnline()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // continue waiting + } + } + } + } + + /** + * Creates a {@link RegionServerThread}. + * Call 'start' on the returned thread to make it run. + * @param c Configuration to use. + * @param hrsc Class to create. + * @param index Used distingushing the object returned. + * @throws IOException + * @return Region server added. + */ + public static JVMClusterUtil.RegionServerThread createRegionServerThread(final HBaseConfiguration c, + final Class hrsc, final int index) + throws IOException { + HRegionServer server; + try { + server = hrsc.getConstructor(HBaseConfiguration.class).newInstance(c); + } catch (Exception e) { + IOException ioe = new IOException(); + ioe.initCause(e); + throw ioe; + } + return new JVMClusterUtil.RegionServerThread(server, index); + } + + /** + * Start the cluster. + * @param m + * @param regionServers + * @return Address to use contacting master. + */ + public static String startup(final HMaster m, + final List regionservers) { + if (m != null) m.start(); + if (regionservers != null) { + for (JVMClusterUtil.RegionServerThread t: regionservers) { + t.start(); + } + } + return m == null? null: m.getMasterAddress().toString(); + } + + /** + * @param master + * @param regionservers + */ + public static void shutdown(final HMaster master, + final List regionservers) { + LOG.debug("Shutting down HBase Cluster"); + // Be careful how the hdfs shutdown thread runs in context where more than + // one regionserver in the mix. + Thread shutdownThread = null; + for (JVMClusterUtil.RegionServerThread t: regionservers) { + Thread tt = t.getRegionServer().setHDFSShutdownThreadOnExit(null); + if (shutdownThread == null && tt != null) { + shutdownThread = tt; + } + } + if (master != null) { + master.shutdown(); + } + // regionServerThreads can never be null because they are initialized when + // the class is constructed. + for(Thread t: regionservers) { + if (t.isAlive()) { + try { + t.join(); + } catch (InterruptedException e) { + // continue + } + } + } + if (master != null) { + while (master.isAlive()) { + try { + // The below has been replaced to debug sometime hangs on end of + // tests. + // this.master.join(): + Threads.threadDumpingIsAlive(master); + } catch(InterruptedException e) { + // continue + } + } + } + Threads.shutdown(shutdownThread); + LOG.info("Shutdown " + + ((regionservers != null)? master.getName(): "0 masters") + + " " + regionservers.size() + " region server(s)"); + } +} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 937350) +++ src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.lang.reflect.Constructor; import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; import java.util.Collections; @@ -28,11 +29,11 @@ import java.util.List; import java.util.Map; import java.util.TreeSet; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -69,9 +70,12 @@ * * Used by {@link HTable} and {@link HBaseAdmin} */ +@SuppressWarnings("serial") public class HConnectionManager implements HConstants { private static final Delete [] DELETE_ARRAY_TYPE = new Delete[0]; private static final Put [] PUT_ARRAY_TYPE = new Put[0]; + public static final String CONNECTION_CLASS_KEY = + "hbase.connection.interface.class"; // Register a shutdown hook, one that cleans up RPC and closes zk sessions. static { @@ -114,7 +118,20 @@ * @param conf * @return HConnection object for the instance specified by the configuration */ + @SuppressWarnings("unchecked") public static HConnection getConnection(HBaseConfiguration conf) { + String className = conf.get(CONNECTION_CLASS_KEY, null); + if (className != null) { + try { + Class cls = Class.forName(className); + Constructor constructor = + cls.getConstructor(new Class [] {HBaseConfiguration.class}); + return (HConnection)constructor.newInstance(new Object [] {conf}); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + // Else do default. TableServers connection; synchronized (HBASE_INSTANCES) { connection = HBASE_INSTANCES.get(conf); @@ -125,7 +142,7 @@ } return connection; } - + /** * Delete connection information for the instance specified by configuration * @param conf @@ -942,10 +959,7 @@ server = this.servers.get(regionServer.toString()); if (server == null) { // Get a connection try { - server = (HRegionInterface)HBaseRPC.waitForProxy( - serverInterfaceClass, HBaseRPCProtocolVersion.versionID, - regionServer.getInetSocketAddress(), this.conf, - this.maxRPCAttempts, this.rpcTimeout); + server = createHRegionServer(regionServer); } catch (RemoteException e) { throw RemoteExceptionHandler.decodeRemoteException(e); } @@ -954,7 +968,20 @@ } return server; } - + + /* + * @param regionServer + * @return Implementation of HRegionInterface. + * @throws IOException + */ + private HRegionInterface createHRegionServer(final HServerAddress regionServer) + throws IOException { + return (HRegionInterface) HBaseRPC.waitForProxy( + this.serverInterfaceClass, HBaseRPCProtocolVersion.versionID, + regionServer.getInetSocketAddress(), this.conf, this.maxRPCAttempts, + this.rpcTimeout); + } + public HRegionInterface getHRegionConnection( HServerAddress regionServer) throws IOException {