diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java index 8632328..dcf019d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile; import java.io.DataOutputStream; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -261,9 +262,9 @@ public abstract class AbstractHFileWriter implements HFile.Writer { /** A helper method to create HFile output streams in constructors */ protected static FSDataOutputStream createOutputStream(Configuration conf, - FileSystem fs, Path path) throws IOException { + FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException { FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); - return FSUtils.create(fs, path, perms); + return FSUtils.create(fs, path, perms, favoredNodes); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index fde4689..1309a2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -25,6 +25,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.SequenceInputStream; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -338,6 +339,7 @@ public class HFile { HFile.DEFAULT_COMPRESSION_ALGORITHM; protected HFileDataBlockEncoder encoder = NoOpDataBlockEncoder.INSTANCE; protected KeyComparator comparator; + protected InetSocketAddress[] favoredNodes; protected ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE; protected int bytesPerChecksum = DEFAULT_BYTES_PER_CHECKSUM; protected boolean includeMVCCReadpoint = true; @@ -390,6 +392,12 @@ public class HFile { return this; } + public WriterFactory withFavoredNodes(InetSocketAddress[] favoredNodes) { + // Deliberately not checking for null here. + this.favoredNodes = favoredNodes; + return this; + } + public WriterFactory withChecksumType(ChecksumType checksumType) { Preconditions.checkNotNull(checksumType); this.checksumType = checksumType; @@ -416,7 +424,7 @@ public class HFile { "filesystem/path or path"); } if (path != null) { - ostream = AbstractHFileWriter.createOutputStream(conf, fs, path); + ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes); } return createWriter(fs, path, ostream, blockSize, compression, encoder, comparator, checksumType, bytesPerChecksum, includeMVCCReadpoint); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index ee60c77..e4ac064 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -115,7 +115,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { final KeyComparator comparator, final ChecksumType checksumType, final int bytesPerChecksum, final boolean includeMVCCReadpoint) throws IOException { super(cacheConf, - ostream == null ? createOutputStream(conf, fs, path) : ostream, + ostream == null ? createOutputStream(conf, fs, path, null) : ostream, path, blockSize, compressAlgo, blockEncoder, comparator); this.checksumType = checksumType; this.bytesPerChecksum = bytesPerChecksum; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FavoredNodesForRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FavoredNodesForRegion.java new file mode 100644 index 0000000..993c0ad --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FavoredNodesForRegion.java @@ -0,0 +1,47 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.net.InetSocketAddress; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName; + +/** + * Abstraction that allows different modules in RegionServer to update/get + * the favored nodes information for regions. + */ +@InterfaceAudience.Private +interface FavoredNodesForRegion { + /** + * Used to update the favored nodes mapping when required. + * @param encodedRegionName + * @param favoredNodes + */ + void updateRegionFavoredNodesMapping(String encodedRegionName, List favoredNodes); + + /** + * Get the favored nodes mapping for this region. Used when the HDFS create API + * is invoked to pass in favored nodes hints for new region files. + * @param encodedRegionName + * @param favoredNodes + */ + InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 28279ba..6128c0a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -701,7 +701,7 @@ public class HRegionFileSystem { // First check to get the permissions FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); // Write the RegionInfo file content - FSDataOutputStream out = FSUtils.create(fs, regionInfoFile, perms); + FSDataOutputStream out = FSUtils.create(fs, regionInfoFile, perms, null); try { out.write(content); } finally { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 2876579..db199f1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3516,7 +3516,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa return builder.build(); } - private void updateRegionFavoredNodesMapping(String encodedRegionName, + @Override + public void updateRegionFavoredNodesMapping(String encodedRegionName, List favoredNodes) { InetSocketAddress[] addr = new InetSocketAddress[favoredNodes.size()]; // Refer to the comment on the declaration of regionFavoredNodesMap on why @@ -3534,6 +3535,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa * @param encodedRegionName * @return array of favored locations */ + @Override public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { return regionFavoredNodesMap.get(encodedRegionName); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 6b8a610..14398ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.InterruptedIOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -790,6 +791,11 @@ public class HStore implements Store { } else { writerCacheConf = cacheConf; } + InetSocketAddress[] favoredNodes = null; + if (region.getRegionServerServices() != null) { + favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion( + region.getRegionInfo().getEncodedName()); + } StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, this.getFileSystem(), blocksize) .withFilePath(fs.createTempName()) @@ -800,6 +806,7 @@ public class HStore implements Store { .withChecksumType(checksumType) .withBytesPerChecksum(bytesPerChecksum) .withCompression(compression) + .withFavoredNodes(favoredNodes) .includeMVCCReadpoint(includeMVCCReadpoint) .build(); return w; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 3a6a4a1..03f4139 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -25,7 +25,6 @@ import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.ipc.RpcServerInterface; @@ -37,7 +36,7 @@ import org.apache.zookeeper.KeeperException; * Services provided by {@link HRegionServer} */ @InterfaceAudience.Private -public interface RegionServerServices extends OnlineRegions { +public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegion { /** * @return True if this regionserver is stopping. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 7fbdeb5..b22b9ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.DataInput; import java.io.FileNotFoundException; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; @@ -529,6 +530,7 @@ public class StoreFile { private long maxKeyCount = 0; private Path dir; private Path filePath; + private InetSocketAddress[] favoredNodes; private ChecksumType checksumType = HFile.DEFAULT_CHECKSUM_TYPE; private int bytesPerChecksum = HFile.DEFAULT_BYTES_PER_CHECKSUM; private boolean includeMVCCReadpoint = true; @@ -571,6 +573,15 @@ public class StoreFile { return this; } + /** + * @param favoredNodes an array of favored nodes or possibly null + * @return this (for chained invocation) + */ + public WriterBuilder withFavoredNodes(InetSocketAddress[] favoredNodes) { + this.favoredNodes = favoredNodes; + return this; + } + public WriterBuilder withDataBlockEncoder(HFileDataBlockEncoder encoder) { Preconditions.checkNotNull(encoder); this.dataBlockEncoder = encoder; @@ -659,7 +670,7 @@ public class StoreFile { } return new Writer(fs, filePath, blockSize, compressAlgo, dataBlockEncoder, conf, cacheConf, comparator, bloomType, maxKeyCount, checksumType, - bytesPerChecksum, includeMVCCReadpoint); + bytesPerChecksum, includeMVCCReadpoint, favoredNodes); } } @@ -764,6 +775,7 @@ public class StoreFile { * @param checksumType the checksum type * @param bytesPerChecksum the number of bytes per checksum value * @param includeMVCCReadpoint whether to write the mvcc readpoint to the file for each KV + * @param favoredNodes * @throws IOException problem writing to FS */ private Writer(FileSystem fs, Path path, int blocksize, @@ -772,7 +784,8 @@ public class StoreFile { CacheConfig cacheConf, final KVComparator comparator, BloomType bloomType, long maxKeys, final ChecksumType checksumType, final int bytesPerChecksum, - final boolean includeMVCCReadpoint) throws IOException { + final boolean includeMVCCReadpoint, InetSocketAddress[] favoredNodes) + throws IOException { this.dataBlockEncoder = dataBlockEncoder != null ? dataBlockEncoder : NoOpDataBlockEncoder.INSTANCE; writer = HFile.getWriterFactory(conf, cacheConf) @@ -783,6 +796,7 @@ public class StoreFile { .withComparator(comparator.getRawComparator()) .withChecksumType(checksumType) .withBytesPerChecksum(bytesPerChecksum) + .withFavoredNodes(favoredNodes) .includeMVCCReadpoint(includeMVCCReadpoint) .create(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 5eea89d..8da23ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -24,7 +24,9 @@ import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -56,6 +58,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.FileSystemVersionException; +import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.FSProtos; @@ -66,6 +69,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -262,11 +266,42 @@ public abstract class FSUtils { * * @param fs {@link FileSystem} on which to write the file * @param path {@link Path} to the file to write + * @param perm permissions + * @param favoredNodes * @return output stream to the created file * @throws IOException if the file cannot be created */ public static FSDataOutputStream create(FileSystem fs, Path path, - FsPermission perm) throws IOException { + FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { + if (fs instanceof HFileSystem) { + FileSystem backingFs = ((HFileSystem)fs).getBackingFs(); + if (backingFs instanceof DistributedFileSystem) { + // Try to use the favoredNodes version via reflection to allow backwards- + // compatibility. + try { + return (FSDataOutputStream) (DistributedFileSystem.class + .getDeclaredMethod("create", Path.class, FsPermission.class, + boolean.class, int.class, short.class, long.class, + Progressable.class, InetSocketAddress[].class) + .invoke(backingFs, path, FsPermission.getDefault(), true, + getDefaultBufferSize(backingFs), + getDefaultReplication(backingFs, path), + getDefaultBlockSize(backingFs, path), + null, favoredNodes)); + } catch (InvocationTargetException ite) { + // Function was properly called, but threw it's own exception. + throw new IOException(ite.getCause()); + } catch (NoSuchMethodException e) { + LOG.debug("Ignoring (most likely Reflection related exception) " + e); + } catch (IllegalArgumentException e) { + LOG.debug("Ignoring (most likely Reflection related exception) " + e); + } catch (SecurityException e) { + LOG.debug("Ignoring (most likely Reflection related exception) " + e); + } catch (IllegalAccessException e) { + LOG.debug("Ignoring (most likely Reflection related exception) " + e); + } + } + } return create(fs, path, perm, true); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 820b99e..aac976e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.net.InetSocketAddress; import java.util.List; import java.util.Map; import java.util.Random; @@ -534,6 +535,16 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { } @Override + public void updateRegionFavoredNodesMapping(String encodedRegionName, + List favoredNodes) { + } + + @Override + public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { + return null; + } + + @Override public MultiResponse replay(RpcController controller, MultiRequest request) throws ServiceException { // TODO Auto-generated method stub diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java new file mode 100644 index 0000000..fc2760c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java @@ -0,0 +1,164 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.fail; + +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.util.Progressable; +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Tests the ability to specify favored nodes for a region. + */ +@Category(MediumTests.class) +public class TestRegionFavoredNodes { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static HTable table; + private static final byte[] TABLE_NAME = Bytes.toBytes("table"); + private static final byte[] COLUMN_FAMILY = Bytes.toBytes("family"); + private static final int FAVORED_NODES_NUM = 3; + private static final int REGION_SERVERS = 6; + private static final int FLUSHES = 3; + private static Method createWithFavoredNode = null; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + try { + createWithFavoredNode = DistributedFileSystem.class.getDeclaredMethod("create", Path.class, + FsPermission.class, boolean.class, int.class, short.class, long.class, + Progressable.class, InetSocketAddress[].class); + } catch (NoSuchMethodException nm) { + return; + } + TEST_UTIL.startMiniCluster(REGION_SERVERS); + table = TEST_UTIL.createTable(TABLE_NAME, COLUMN_FAMILY); + TEST_UTIL.createMultiRegions(table, COLUMN_FAMILY); + TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + if (createWithFavoredNode == null) { + return; + } + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testFavoredNodes() throws Exception { + Assume.assumeTrue(createWithFavoredNode != null); + // Get the addresses of the datanodes in the cluster. + InetSocketAddress[] nodes = new InetSocketAddress[REGION_SERVERS]; + List datanodes = TEST_UTIL.getDFSCluster().getDataNodes(); + Method selfAddress; + try { + selfAddress = DataNode.class.getMethod("getSelfAddr"); + } catch (NoSuchMethodException ne) { + selfAddress = DataNode.class.getMethod("getXferAddress"); + } + for (int i = 0; i < REGION_SERVERS; i++) { + nodes[i] = (InetSocketAddress)selfAddress.invoke(datanodes.get(i)); + } + + String[] nodeNames = new String[REGION_SERVERS]; + for (int i = 0; i < REGION_SERVERS; i++) { + nodeNames[i] = nodes[i].getAddress().getHostAddress() + ":" + + nodes[i].getPort(); + } + + // For each region, choose some datanodes as the favored nodes then assign + // them as favored nodes through the HRegion. + for (int i = 0; i < REGION_SERVERS; i++) { + HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(i); + List regions = server.getOnlineRegions(TABLE_NAME); + for (HRegion region : regions) { + ListfavoredNodes = + new ArrayList(3); + String encodedRegionName = region.getRegionInfo().getEncodedName(); + for (int j = 0; j < FAVORED_NODES_NUM; j++) { + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder b = + org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(); + b.setHostName(nodes[(i + j) % REGION_SERVERS].getAddress().getHostAddress()); + b.setPort(nodes[(i + j) % REGION_SERVERS].getPort()); + b.setStartCode(-1); + favoredNodes.add(b.build()); + } + server.updateRegionFavoredNodesMapping(encodedRegionName, favoredNodes); + } + } + + // Write some data to each region and flush. Repeat some number of times to + // get multiple files for each region. + for (int i = 0; i < FLUSHES; i++) { + TEST_UTIL.loadTable(table, COLUMN_FAMILY); + TEST_UTIL.flush(); + } + + // For each region, check the block locations of each file and ensure that + // they are consistent with the favored nodes for that region. + for (int i = 0; i < REGION_SERVERS; i++) { + HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(i); + List regions = server.getOnlineRegions(TABLE_NAME); + for (HRegion region : regions) { + List files = region.getStoreFileList(new byte[][]{COLUMN_FAMILY}); + for (String file : files) { + FileStatus status = TEST_UTIL.getDFSCluster().getFileSystem(). + getFileStatus(new Path(new URI(file).getPath())); + BlockLocation[] lbks = + ((DistributedFileSystem)TEST_UTIL.getDFSCluster().getFileSystem()) + .getFileBlockLocations(status, 0, Long.MAX_VALUE); + for (BlockLocation lbk : lbks) { + locations: + for (String info : lbk.getNames()) { + for (int j = 0; j < FAVORED_NODES_NUM; j++) { + if (info.equals(nodeNames[(i + j) % REGION_SERVERS])) { + continue locations; + } + } + // This block was at a location that was not a favored location. + fail("Block location " + info + " not a favored node"); + } + } + } + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java index 1d2c128..8975cb0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -193,6 +194,16 @@ public class MockRegionServerServices implements RegionServerServices { } @Override + public void updateRegionFavoredNodesMapping(String encodedRegionName, + List favoredNodes) { + } + + @Override + public InetSocketAddress[] getFavoredNodesForRegion(String encodedRegionName) { + return null; + } + + @Override public Map getRecoveringRegions() { // TODO Auto-generated method stub return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java index e2d3c3e..a333c8e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java @@ -247,7 +247,7 @@ public class TestFSUtils { // then that the correct file is created Path p = new Path("target" + File.separator + UUID.randomUUID().toString()); try { - FSDataOutputStream out = FSUtils.create(fs, p, filePerm); + FSDataOutputStream out = FSUtils.create(fs, p, filePerm, null); out.close(); FileStatus stat = fs.getFileStatus(p); assertEquals(new FsPermission("700"), stat.getPermission()); @@ -269,13 +269,13 @@ public class TestFSUtils { Path p = new Path(htu.getDataTestDir(), "temptarget" + File.separator + file); Path p1 = new Path(htu.getDataTestDir(), "temppath" + File.separator + file); try { - FSDataOutputStream out = FSUtils.create(fs, p, perms); + FSDataOutputStream out = FSUtils.create(fs, p, perms, null); out.close(); assertTrue("The created file should be present", FSUtils.isExists(fs, p)); // delete the file with recursion as false. Only the file will be deleted. FSUtils.delete(fs, p, false); // Create another file - FSDataOutputStream out1 = FSUtils.create(fs, p1, perms); + FSDataOutputStream out1 = FSUtils.create(fs, p1, perms, null); out1.close(); // delete the file with recursion as false. Still the file only will be deleted FSUtils.delete(fs, p1, true);