Index: hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/fs/TestBlockReorder.java (revision 0) @@ -0,0 +1,461 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.fs; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.ServerSocket; + +/** + * Tests for the hdfs fix from HBASE-6435. + */ +@Category(LargeTests.class) +public class TestBlockReorder { + private static final Log LOG = LogFactory.getLog(TestBlockReorder.class); + + static { + ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger) HFileSystem.LOG).getLogger().setLevel(Level.ALL); + } + + private Configuration conf; + private MiniDFSCluster cluster; + private HBaseTestingUtility htu; + private DistributedFileSystem dfs; + private static final String host1 = "host1"; + private static final String host2 = "host2"; + private static final String host3 = "host3"; + + @Before + public void setUp() throws Exception { + htu = new HBaseTestingUtility(); + htu.getConfiguration().setInt("dfs.block.size", 1024);// For the test with multiple blocks + htu.getConfiguration().setBoolean("dfs.support.append", true); + htu.getConfiguration().setInt("dfs.replication", 3); + // We have a rack to get always the same location order but it does not work. + htu.startMiniDFSCluster(3, + new String[]{"/r1", "/r2", "/r3"}, new String[]{host1, host2, host3}); + + conf = htu.getConfiguration(); + cluster = htu.getDFSCluster(); + dfs = (DistributedFileSystem) FileSystem.get(conf); + } + + @After + public void tearDownAfterClass() throws Exception { + htu.shutdownMiniCluster(); + } + + /** + * Test that we're can add a hook, and that this hook works when we try to read the file in HDFS. + */ + @Test + public void testBlockLocationReorder() throws Exception { + Path p = new Path("hello"); + + Assert.assertTrue((short) cluster.getDataNodes().size() > 1); + final int repCount = 2; + + // Let's write the file + FSDataOutputStream fop = dfs.create(p, (short) repCount); + final double toWrite = 875.5613; + fop.writeDouble(toWrite); + fop.close(); + + // Let's check we can read it when everybody's there + long start = System.currentTimeMillis(); + FSDataInputStream fin = dfs.open(p); + Assert.assertTrue(toWrite == fin.readDouble()); + long end = System.currentTimeMillis(); + LOG.info("readtime= " + (end - start)); + fin.close(); + Assert.assertTrue((end - start) < 30 * 1000); + + // Let's kill the first location. But actually the fist location returned will change + // The first thing to do it to get the location, then the port + FileStatus f = dfs.getFileStatus(p); + BlockLocation[] lbs; + do { + lbs = dfs.getFileBlockLocations(f, 0, 1); + } while (lbs.length != 1 && lbs[0].getLength() != repCount); + final String name = lbs[0].getNames()[0]; + Assert.assertTrue(name.indexOf(':') > 0); + String portS = name.substring(name.indexOf(':') + 1); + final int port = Integer.parseInt(portS); + LOG.info("port= " + port); + int ipcPort = -1; + + // Let's find the DN to kill. cluster.getDataNodes(int) is not on the same port, so wee need + // to iterate ourselves. + boolean ok = false; + final String lookup = lbs[0].getHosts()[0]; + StringBuilder sb = new StringBuilder(); + for (DataNode dn : cluster.getDataNodes()) { + final String dnName = getHostName(dn); + sb.append(dnName).append(' '); + if (lookup.equals(dnName)) { + ok = true; + LOG.info("killing datanode " + name + " / " + lookup); + ipcPort = dn.ipcServer.getListenerAddress().getPort(); + dn.shutdown(); + LOG.info("killed datanode " + name + " / " + lookup); + break; + } + } + Assert.assertTrue( + "didn't find the server to kill, was looking for " + lookup + " found " + sb, ok); + LOG.info("ipc port= " + ipcPort); + + // Add the hook, with an implementation checking that we don't use the port we've just killed. + HFileSystem.addLocationOrderInterceptor(conf, + new HFileSystem.ReorderBlocks() { + @Override + public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) { + for (LocatedBlock lb : lbs.getLocatedBlocks()) { + if (lb.getLocations().length > 1) { + if (lb.getLocations()[0].getHostName().equals(lookup)) { + LOG.info("HFileSystem bad host, inverting"); + DatanodeInfo tmp = lb.getLocations()[0]; + lb.getLocations()[0] = lb.getLocations()[1]; + lb.getLocations()[1] = tmp; + } + } + } + } + }); + + ServerSocket ss = new ServerSocket(port);// We're taking the port to have a timeout issue later. + ServerSocket ssI = new ServerSocket(ipcPort); + + // Now it will fail with a timeout, unfortunately it does not always connect to the same box, + // so we try 10 times; with the reorder it will never last more than a few milli seconds + for (int i = 0; i < 10; i++) { + start = System.currentTimeMillis(); + + fin = dfs.open(p); + Assert.assertTrue(toWrite == fin.readDouble()); + fin.close(); + end = System.currentTimeMillis(); + LOG.info("HFileSystem readtime= " + (end - start)); + Assert.assertFalse("We took too much time to read", (end - start) > 60000); + } + ss.close(); + ssI.close(); + } + + /** + * Allow to get the hostname, using getHostName (hadoop 1) or getDisplayName (hadoop 2) + */ + private String getHostName(DataNode dn) throws InvocationTargetException, IllegalAccessException { + Method m; + try { + m = DataNode.class.getMethod("getDisplayName"); + } catch (NoSuchMethodException e) { + try { + m = DataNode.class.getMethod("getHostName"); + } catch (NoSuchMethodException e1) { + throw new RuntimeException(e1); + } + } + + String res = (String) m.invoke(dn); + if (res.contains(":")) { + return res.split(":")[0]; + } else { + return res; + } + } + + /** + * Test that the hook works within HBase, including when there are multiple blocks. + */ + @Test() + public void testHBaseCluster() throws Exception { + byte[] sb = "sb".getBytes(); + htu.startMiniZKCluster(); + + MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1); + hbm.waitForActiveAndReadyMaster(); + hbm.getRegionServer(0).waitForServerOnline(); + + // We want to have a datanode with the same name as the region server, so + // we're going to get the regionservername, and start a new datanode with this name. + String host4 = hbm.getRegionServer(0).getServerName().getHostname(); + LOG.info("Starting a new datanode with the name=" + host4); + cluster.startDataNodes(conf, 1, true, null, new String[]{"/r4"}, new String[]{host4}, null); + cluster.waitClusterUp(); + + final int repCount = 3; + HRegionServer targetRs = hbm.getRegionServer(0); + + // We use the regionserver file system & conf as we expect it to have the hook. + conf = targetRs.getConfiguration(); + HFileSystem rfs = (HFileSystem) targetRs.getFileSystem(); + HTable h = htu.createTable("table".getBytes(), sb); + + // Now, we have 4 datanodes and a replication count of 3. So we don't know if the datanode + // with the same node will be used. We can't really stop an existing datanode, this would + // make us fall in nasty hdfs bugs/issues. So we're going to try multiple times. + + // Now we need to find the log file, its locations, and look at it + String rootDir = FileSystem.get(conf).makeQualified(new Path( + conf.get(HConstants.HBASE_DIR) + "/" + HConstants.HREGION_LOGDIR_NAME + + "/" + targetRs.getServerName().toString())).toUri().getPath(); + + DistributedFileSystem mdfs = (DistributedFileSystem) + hbm.getMaster().getMasterFileSystem().getFileSystem(); + + + int nbTest = 0; + while (nbTest < 10) { + htu.getHBaseAdmin().rollHLogWriter(targetRs.getServerName().toString()); + + // We need a sleep as the namenode is informed asynchronously + Thread.sleep(100); + + // insert one put to ensure a minimal size + Put p = new Put(sb); + p.add(sb, sb, sb); + h.put(p); + + DirectoryListing dl = dfs.getClient().listPaths(rootDir, HdfsFileStatus.EMPTY_NAME); + HdfsFileStatus[] hfs = dl.getPartialListing(); + + // As we wrote a put, we should have at least one log file. + Assert.assertTrue(hfs.length >= 1); + for (HdfsFileStatus hf : hfs) { + LOG.info("Log file found: " + hf.getLocalName() + " in " + rootDir); + String logFile = rootDir + "/" + hf.getLocalName(); + FileStatus fsLog = rfs.getFileStatus(new Path(logFile)); + + LOG.info("Checking log file: " + logFile); + // Now checking that the hook is up and running + // We can't call directly getBlockLocations, it's not available in HFileSystem + // We're trying multiple times to be sure, as the order is random + + BlockLocation[] bls = rfs.getFileBlockLocations(fsLog, 0, 1); + if (bls.length > 0) { + BlockLocation bl = bls[0]; + + LOG.info(bl.getHosts().length + " replicas for block 0 in " + logFile+ " "); + for (int i = 0; i < bl.getHosts().length - 1; i++) { + LOG.info(bl.getHosts()[i] +" "+logFile); + Assert.assertNotSame(bl.getHosts()[i], host4); + } + String last = bl.getHosts()[bl.getHosts().length - 1]; + LOG.info(last +" "+logFile); + if (host4.equals(last)) { + nbTest++; + LOG.info(logFile + " is on the new datanode and is ok"); + if (bl.getHosts().length == 3) { + // We can test this case from the file system as well + // Checking the underlying file system. Multiple times as the order is random + testFromDFS(dfs, logFile, repCount, host4); + + // now from the master + testFromDFS(mdfs, logFile, repCount, host4); + } + } + } + } + } + } + + private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, String localhost) + throws Exception { + // Multiple times as the order is random + for (int i = 0; i < 10; i++) { + LocatedBlocks l; + // The NN gets the block list asynchronously, so we may need multiple tries to get the list + final long max = System.currentTimeMillis() + 10000; + boolean done; + do { + Assert.assertTrue("Can't get enouth replica.", System.currentTimeMillis() < max); + l = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1); + Assert.assertNotNull("Can't get block locations for " + src, l); + Assert.assertNotNull(l.getLocatedBlocks()); + Assert.assertTrue(l.getLocatedBlocks().size() > 0); + + done = true; + for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) { + done = (l.get(y).getLocations().length == repCount); + } + } while (!done); + + for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) { + Assert.assertEquals(localhost, l.get(y).getLocations()[repCount - 1].getHostName()); + } + } + } + + private static ClientProtocol getNamenode(DFSClient dfsc) throws Exception { + Field nf = DFSClient.class.getDeclaredField("namenode"); + nf.setAccessible(true); + return (ClientProtocol) nf.get(dfsc); + } + + /** + * Test that the reorder algo works as we expect. + */ + @Test + public void testBlockLocation() throws Exception { + // We need to start HBase to get HConstants.HBASE_DIR set in conf + htu.startMiniZKCluster(); + MiniHBaseCluster hbm = htu.startMiniHBaseCluster(1, 1); + conf = hbm.getConfiguration(); + + + // The "/" is mandatory, without it we've got a null pointer exception on the namenode + final String fileName = "/helloWorld"; + Path p = new Path(fileName); + + final int repCount = 3; + Assert.assertTrue((short) cluster.getDataNodes().size() >= repCount); + + // Let's write the file + FSDataOutputStream fop = dfs.create(p, (short) repCount); + final double toWrite = 875.5613; + fop.writeDouble(toWrite); + fop.close(); + + + // The interceptor is not set in this test, so we get the raw list at this point + LocatedBlocks l; + final long max = System.currentTimeMillis() + 10000; + do { + l = getNamenode(dfs.getClient()).getBlockLocations(fileName, 0, 1); + Assert.assertNotNull(l.getLocatedBlocks()); + Assert.assertEquals(l.getLocatedBlocks().size(), 1); + Assert.assertTrue("Expecting " + repCount + " , got " + l.get(0).getLocations().length, + System.currentTimeMillis() < max); + } while (l.get(0).getLocations().length != repCount); + + + // Let's fix our own order + setOurOrder(l); + + HFileSystem.LogReorderBlocks lrb = new HFileSystem.LogReorderBlocks(); + // Should be filtered, the name is different + lrb.reorderBlocks(conf, l, fileName); + checkOurOrder(l); + + // Should be reordered, as we pretend to be a file name with a compliant stuff + Assert.assertNotNull(conf.get(HConstants.HBASE_DIR)); + Assert.assertFalse(conf.get(HConstants.HBASE_DIR).isEmpty()); + String pseudoLogFile = conf.get(HConstants.HBASE_DIR) + "/" + + HConstants.HREGION_LOGDIR_NAME + "/" + host1 + ",6977,6576" + "/mylogfile"; + + // Check that it will be possible to extract a ServerName from our construction + Assert.assertNotNull("log= " + pseudoLogFile, + HLog.getServerNameFromHLogDirectoryName(dfs.getConf(), pseudoLogFile)); + + // And check we're doing the right reorder. + lrb.reorderBlocks(conf, l, pseudoLogFile); + checkOurFixedOrder(l); + + // And change again and check again + l.get(0).getLocations()[0].setHostName(host2); + l.get(0).getLocations()[1].setHostName(host1); + l.get(0).getLocations()[2].setHostName(host3); + lrb.reorderBlocks(conf, l, pseudoLogFile); + checkOurFixedOrder(l); + + // And change again and check again + l.get(0).getLocations()[0].setHostName(host2); + l.get(0).getLocations()[1].setHostName(host1); + l.get(0).getLocations()[2].setHostName(host3); + lrb.reorderBlocks(conf, l, pseudoLogFile); + checkOurFixedOrder(l); + + // nothing to do here, but let's check + l.get(0).getLocations()[0].setHostName(host2); + l.get(0).getLocations()[1].setHostName(host3); + l.get(0).getLocations()[2].setHostName(host1); + lrb.reorderBlocks(conf, l, pseudoLogFile); + checkOurFixedOrder(l); + + // nothing to do here, check again + l.get(0).getLocations()[0].setHostName(host2); + l.get(0).getLocations()[1].setHostName(host3); + l.get(0).getLocations()[2].setHostName("nothing"); + lrb.reorderBlocks(conf, l, pseudoLogFile); + Assert.assertEquals(host2, l.get(0).getLocations()[0].getHostName()); + Assert.assertEquals(host3, l.get(0).getLocations()[1].getHostName()); + Assert.assertEquals("nothing", l.get(0).getLocations()[2].getHostName()); + } + + private void setOurOrder(LocatedBlocks l) { + l.get(0).getLocations()[0].setHostName(host1); + l.get(0).getLocations()[1].setHostName(host2); + l.get(0).getLocations()[2].setHostName(host3); + } + + private void checkOurOrder(LocatedBlocks l) { + Assert.assertEquals(host1, l.get(0).getLocations()[0].getHostName()); + Assert.assertEquals(host2, l.get(0).getLocations()[1].getHostName()); + Assert.assertEquals(host3, l.get(0).getLocations()[2].getHostName()); + } + + private void checkOurFixedOrder(LocatedBlocks l) { + Assert.assertEquals(host2, l.get(0).getLocations()[0].getHostName()); + Assert.assertEquals(host3, l.get(0).getLocations()[1].getHostName()); + Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName()); + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1369262) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -439,6 +439,25 @@ return this.dfsCluster; } + + public MiniDFSCluster startMiniDFSCluster(int servers, final String racks[], String hosts[]) + throws Exception { + createDirsAndSetProperties(); + this.dfsCluster = new MiniDFSCluster(0, this.conf, servers, true, true, + true, null, racks, hosts, null); + + // Set this just-started cluster as our filesystem. + FileSystem fs = this.dfsCluster.getFileSystem(); + this.conf.set("fs.defaultFS", fs.getUri().toString()); + // Do old style too just to be safe. + this.conf.set("fs.default.name", fs.getUri().toString()); + + // Wait for the cluster to be totally up + this.dfsCluster.waitClusterUp(); + + return this.dfsCluster; + } + public MiniDFSCluster startMiniDFSClusterForTestHLog(int namenodePort) throws IOException { createDirsAndSetProperties(); dfsCluster = new MiniDFSCluster(namenodePort, conf, 5, false, true, true, null, @@ -637,6 +656,11 @@ return startMiniHBaseCluster(numMasters, numSlaves); } + public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, final int numSlaves) + throws IOException, InterruptedException{ + return startMiniHBaseCluster(numMasters, numSlaves, null, null); + } + /** * Starts up mini hbase cluster. Usually used after call to * {@link #startMiniCluster(int, int)} when doing stepped startup of clusters. @@ -649,7 +673,8 @@ * @see {@link #startMiniCluster()} */ public MiniHBaseCluster startMiniHBaseCluster(final int numMasters, - final int numSlaves) + final int numSlaves, Class masterClass, + Class regionserverClass) throws IOException, InterruptedException { // Now do the mini hbase cluster. Set the hbase.rootdir in config. createRootDir(); @@ -660,7 +685,8 @@ conf.setInt("hbase.master.wait.on.regionservers.maxtostart", numSlaves); Configuration c = new Configuration(this.conf); - this.hbaseCluster = new MiniHBaseCluster(c, numMasters, numSlaves); + this.hbaseCluster = + new MiniHBaseCluster(c, numMasters, numSlaves, masterClass, regionserverClass); // Don't leave here till we've done a successful scan of the .META. HTable t = new HTable(c, HConstants.META_TABLE_NAME); ResultScanner s = t.getScanner(new Scan()); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (revision 1369262) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java (working copy) @@ -53,6 +53,7 @@ import org.apache.log4j.Level; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -720,6 +721,29 @@ } } + + @Test + public void testGetServerNameFromHLogDirectoryName() throws IOException { + String hl = conf.get(HConstants.HBASE_DIR) + "/"+ + HLog.getHLogDirectoryName(new ServerName("hn", 450, 1398).toString()); + + // Must not throw exception + Assert.assertNull(HLog.getServerNameFromHLogDirectoryName(conf, null)); + Assert.assertNull(HLog.getServerNameFromHLogDirectoryName(conf, + conf.get(HConstants.HBASE_DIR) + "/")); + Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, "") ); + Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, " ") ); + Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, hl) ); + Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, hl+"qdf") ); + Assert.assertNull( HLog.getServerNameFromHLogDirectoryName(conf, "sfqf"+hl+"qdf") ); + + Assert.assertNotNull( HLog.getServerNameFromHLogDirectoryName(conf, conf.get( + HConstants.HBASE_DIR) + + "/.logs/localhost,32984,1343316388997/localhost%2C32984%2C1343316388997.1343316390417" + )); + Assert.assertNotNull( HLog.getServerNameFromHLogDirectoryName(conf, hl+"/qdf") ); + } + /** * A loaded WAL coprocessor won't break existing HLog test cases. */ Index: hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (revision 1369262) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (working copy) @@ -76,13 +76,22 @@ * @throws IOException */ public MiniHBaseCluster(Configuration conf, int numMasters, - int numRegionServers) - throws IOException, InterruptedException { + int numRegionServers) + throws IOException, InterruptedException { this.conf = conf; conf.set(HConstants.MASTER_PORT, "0"); - init(numMasters, numRegionServers); + init(numMasters, numRegionServers, null, null); } + public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, + Class masterClass, + Class regionserverClass) + throws IOException, InterruptedException { + this.conf = conf; + conf.set(HConstants.MASTER_PORT, "0"); + init(numMasters, numRegionServers, masterClass, regionserverClass); + } + public Configuration getConfiguration() { return this.conf; } @@ -186,12 +195,21 @@ } } - private void init(final int nMasterNodes, final int nRegionNodes) + private void init(final int nMasterNodes, final int nRegionNodes, + Class masterClass, + Class regionserverClass) throws IOException, InterruptedException { try { + if (masterClass == null){ + masterClass = HMaster.class; + } + if (regionserverClass == null){ + regionserverClass = MiniHBaseCluster.MiniHBaseClusterRegionServer.class; + } + // start up a LocalHBaseCluster hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, 0, - HMaster.class, MiniHBaseCluster.MiniHBaseClusterRegionServer.class); + masterClass, regionserverClass); // manually add the regionservers as other users for (int i=0; i 1) { + boolean found = false; + for (int i = 0; i < dnis.length - 1 && !found; i++) { + if (hostName.equals(dnis[i].getHostName())) { + // advance the other locations by one and put this one at the last place. + DatanodeInfo toLast = dnis[i]; + System.arraycopy(dnis, i+1, dnis, i, dnis.length-i-1); + dnis[dnis.length - 1] = toLast; + found = true; + } + } + } + } + } + } + + /** * Create a new HFileSystem object, similar to FileSystem.get(). * This returns a filesystem object that avoids checksum * verification in the filesystem for hfileblock-reads. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1369262) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -53,6 +53,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -151,6 +152,7 @@ private WALCoprocessorHost coprocessorHost; + static void resetLogReaderClass() { HLog.logReaderClass = null; } @@ -1757,7 +1759,63 @@ return dirName.toString(); } + /** + * Returns null if it's not a log file. Returns the ServerName of the region server that created + * this log file otherwise. + * The format is: / [base directory for hbase] / hbase / .logs / ServerName / logfile + * + */ + public static ServerName getServerNameFromHLogDirectoryName(Configuration conf, String path) + throws IOException { + if (path == null || path.length() <= HConstants.HREGION_LOGDIR_NAME.length()) + return null; + + if (conf == null){ + throw new IllegalArgumentException("parameter conf must be set"); + } + + final String rootDir = conf.get(HConstants.HBASE_DIR); + if (rootDir == null || rootDir.isEmpty()) { + LOG.info(HConstants.HBASE_DIR + " key not found in conf."); + return null; + } + + final StringBuilder startPathSB = new StringBuilder(rootDir); + if (!rootDir.endsWith("/")) startPathSB.append('/'); + startPathSB.append(HConstants.HREGION_LOGDIR_NAME); + if (!HConstants.HREGION_LOGDIR_NAME.endsWith("/")) startPathSB.append('/'); + final String startPath = startPathSB.toString(); + + String fullPath; + try { + fullPath = FileSystem.get(conf).makeQualified(new Path(path)).toString(); + }catch (IllegalArgumentException e){ + LOG.info("Call to makeQualified failed on "+ path+" "+e.getMessage()); + return null; + } + + if (!fullPath.startsWith(startPath)){ + return null; + } + + final String serverNameAndFile = fullPath.substring(startPath.length()); + + if (serverNameAndFile.indexOf('/') < "a,0,0".length() ){ + // Either it's a file, not a directory either it's not a ServerName format + return null; + } + + final String serverName = serverNameAndFile.substring(0, serverNameAndFile.indexOf('/')-1); + + if (!ServerName.isFullServerName(serverName)){ + return null; + } + + return ServerName.parseServerName(serverName); + } + + /** * Get the directory we are making logs in. * * @return dir Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ServerName.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ServerName.java (revision 1369262) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ServerName.java (working copy) @@ -301,7 +301,17 @@ new ServerName(str, NON_STARTCODE); } + /** + * @return true if the String follows the pattern of {@link ServerName#toString()}, false + * otherwise. + */ + public static boolean isFullServerName(final String str){ + if (str == null ||str.isEmpty()) return false; + return SERVERNAME_PATTERN.matcher(str).matches(); + } + + /** * Get a ServerName from the passed in data bytes. * @param data Data with a serialize server name in it; can handle the old style * servername where servername was host and port. Works too with data that Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (revision 1369262) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (working copy) @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.UUID; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -45,9 +44,9 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.backup.HFileArchiver; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.master.metrics.MasterMetrics; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.OrphanHLogAfterSplitException; @@ -118,6 +117,7 @@ // setup the filesystem variable // set up the archived logs path this.oldLogDir = createInitialFileSystemLayout(); + HFileSystem.addLocationsOrderHack(conf); } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1369262) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -79,6 +79,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseServer; import org.apache.hadoop.hbase.master.metrics.MXBeanImpl;