diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 99be77a..c5b0ad1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -193,8 +193,9 @@ public class HRegionFileSystem { ArrayList storeFiles = new ArrayList(files.length); for (FileStatus status: files) { if (!StoreFileInfo.isValid(status)) continue; - - storeFiles.add(new StoreFileInfo(this.conf, this.fs, status)); + StoreFileInfo info = ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo, + regionInfoForFs, familyName, status); + storeFiles.add(info); } return storeFiles; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index cbd34af..dc22f80 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -503,7 +503,7 @@ public class HStore implements Store { completionService.submit(new Callable() { @Override public StoreFile call() throws IOException { - StoreFile storeFile = createStoreFileAndReader(storeFileInfo.getPath()); + StoreFile storeFile = createStoreFileAndReader(storeFileInfo); return storeFile; } }); @@ -598,6 +598,10 @@ public class HStore implements Store { private StoreFile createStoreFileAndReader(final Path p) throws IOException { StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p); + return createStoreFileAndReader(info); + } + + private StoreFile createStoreFileAndReader(final StoreFileInfo info) throws IOException { info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf, this.family.getBloomFilterType()); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java index 6d0c714..30cc070 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.HalfStoreFileReader; import org.apache.hadoop.hbase.io.Reference; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.util.FSUtils; @@ -133,6 +132,22 @@ public class StoreFileInfo implements Comparable { } /** + * Create a Store File Info from an HFileLink + * @param conf the {@link Configuration} to use + * @param fs The current file system to use. + * @param fileStatus The {@link FileStatus} of the file + */ + public StoreFileInfo(final Configuration conf, final FileSystem fs, final FileStatus fileStatus, + final HFileLink link) + throws IOException { + this.conf = conf; + this.fileStatus = fileStatus; + // HFileLink + this.reference = null; + this.link = link; + } + + /** * Sets the region coprocessor env. * @param coprocessorHost */ diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java index 69708db..c70b667 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java @@ -18,9 +18,16 @@ package org.apache.hadoop.hbase.util; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; /** * Similar to {@link RegionReplicaUtil} but for the server side @@ -48,5 +55,27 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { || !isDefaultReplica(region.getRegionInfo()); } + /** + * Returns a StoreFileInfo from the given FileStatus. Secondary replicas refer to the + * files of the primary region, so an HFileLink is used to construct the StoreFileInfo. This + * way ensures that the secondary will be able to continue reading the store files even if + * they are moved to archive after compaction + * @throws IOException + */ + public static StoreFileInfo getStoreFileInfo(Configuration conf, FileSystem fs, + HRegionInfo regionInfo, HRegionInfo regionInfoForFs, String familyName, FileStatus status) + throws IOException { + + // if this is a primary region, just return the StoreFileInfo constructed from path + if (regionInfo.equals(regionInfoForFs)) { + return new StoreFileInfo(conf, fs, status); + } + + // else create a store file link. The link file does not exists on filesystem though. + HFileLink link = new HFileLink(conf, + HFileLink.createPath(regionInfoForFs.getTable(), regionInfoForFs.getEncodedName() + , familyName, status.getPath().getName())); + return new StoreFileInfo(conf, fs, status, link); + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index d65babc..9c74001 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -306,6 +306,7 @@ public class TestHRegion { // Inject our faulty LocalFileSystem conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); user.runAs(new PrivilegedExceptionAction() { + @Override public Object run() throws Exception { // Make sure it worked (above is sensitive to caching details in hadoop core) FileSystem fs = FileSystem.get(conf); @@ -4087,6 +4088,7 @@ public class TestHRegion { // create a primary region, load some data and flush // create a secondary region, and do a get against that Path rootDir = new Path(dir + "testRegionReplicaSecondary"); + TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); byte[][] families = new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") @@ -4136,6 +4138,7 @@ public class TestHRegion { // create a primary region, load some data and flush // create a secondary region, and do a put against that Path rootDir = new Path(dir + "testRegionReplicaSecondary"); + TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); byte[][] families = new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") @@ -4183,7 +4186,60 @@ public class TestHRegion { HRegion.closeHRegion(secondaryRegion); } } + } + + @Test + public void testCompactionFromPrimary() throws IOException { + Path rootDir = new Path(dir + "testRegionReplicaSecondary"); + TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); + + byte[][] families = new byte[][] { + Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") + }; + byte[] cq = Bytes.toBytes("cq"); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testRegionReplicaSecondary")); + for (byte[] family : families) { + htd.addFamily(new HColumnDescriptor(family)); + } + + long time = System.currentTimeMillis(); + HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + false, time, 0); + HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + false, time, 1); + + HRegion primaryRegion = null, secondaryRegion = null; + try { + primaryRegion = HRegion.createHRegion(primaryHri, + rootDir, TEST_UTIL.getConfiguration(), htd); + + // load some data + putData(primaryRegion, 0, 1000, cq, families); + + // flush region + primaryRegion.flushcache(); + + // open secondary region + secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, CONF); + + // move the file of the primary region to the archive, simulating a compaction + Collection storeFiles = primaryRegion.getStore(families[0]).getStorefiles(); + primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles); + Collection storeFileInfos = primaryRegion.getRegionFileSystem().getStoreFiles(families[0]); + Assert.assertTrue(storeFileInfos == null || storeFileInfos.size() == 0); + + verifyData(secondaryRegion, 0, 1000, cq, families); + } finally { + if (primaryRegion != null) { + HRegion.closeHRegion(primaryRegion); + } + if (secondaryRegion != null) { + HRegion.closeHRegion(secondaryRegion); + } + } } private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index da48c49..aabf7ef 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -36,8 +36,6 @@ import java.util.NavigableSet; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicBoolean; -import junit.framework.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -82,6 +80,7 @@ import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.util.Progressable; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -201,6 +200,7 @@ public class TestStore { // Inject our faulty LocalFileSystem conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); user.runAs(new PrivilegedExceptionAction() { + @Override public Object run() throws Exception { // Make sure it worked (above is sensitive to caching details in hadoop core) FileSystem fs = FileSystem.get(conf); @@ -1003,6 +1003,7 @@ public class TestStore { store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf)); } + @Test public void testRefreshStoreFiles() throws Exception { init(this.name.getMethodName()); @@ -1049,6 +1050,7 @@ public class TestStore { } @SuppressWarnings("unchecked") + @Test public void testRefreshStoreFilesNotChanged() throws IOException { init(this.name.getMethodName());