Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1582543) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -637,25 +637,29 @@ status.setStatus("Writing region info on filesystem"); fs.checkRegionInfoOnFilesystem(); - // Remove temporary data left over from old regions - status.setStatus("Cleaning up temporary data from old regions"); - fs.cleanupTempDir(); - // Initialize all the HStores status.setStatus("Initializing all the Stores"); long maxSeqId = initializeRegionStores(reporter, status); - status.setStatus("Cleaning up detritus from prior splits"); - // Get rid of any splits or merges that were lost in-progress. Clean out - // these directories here on open. We may be opening a region that was - // being split but we crashed in the middle of it all. - fs.cleanupAnySplitDetritus(); - fs.cleanupMergesDir(); - this.writestate.setReadOnly(ServerRegionReplicaUtil.isReadOnly(this)); this.writestate.flushRequested = false; this.writestate.compacting = 0; + if (this.writestate.writesEnabled) { + // Remove temporary data left over from old regions + status.setStatus("Cleaning up temporary data from old regions"); + fs.cleanupTempDir(); + } + + if (this.writestate.writesEnabled) { + status.setStatus("Cleaning up detritus from prior splits"); + // Get rid of any splits or merges that were lost in-progress. Clean out + // these directories here on open. We may be opening a region that was + // being split but we crashed in the middle of it all. + fs.cleanupAnySplitDetritus(); + fs.cleanupMergesDir(); + } + // Initialize split policy this.splitPolicy = RegionSplitPolicy.create(this, conf); @@ -753,9 +757,12 @@ } } mvcc.initialize(maxMemstoreTS + 1); - // Recover any edits if available. - maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( - this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + + if (ServerRegionReplicaUtil.shouldReplayRecoveredEdits(this)) { + // Recover any edits if available. + maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny( + this.fs.getRegionDir(), maxSeqIdInStores, reporter, status)); + } return maxSeqId; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java (revision 1582543) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java (working copy) @@ -192,8 +192,9 @@ ArrayList storeFiles = new ArrayList(files.length); for (FileStatus status: files) { if (!StoreFileInfo.isValid(status)) continue; - - storeFiles.add(new StoreFileInfo(this.conf, this.fs, status)); + StoreFileInfo info = ServerRegionReplicaUtil.getStoreFileInfo(conf, fs, regionInfo, + regionInfoForFs, familyName, status); + storeFiles.add(info); } return storeFiles; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1582543) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -497,7 +497,7 @@ completionService.submit(new Callable() { @Override public StoreFile call() throws IOException { - StoreFile storeFile = createStoreFileAndReader(storeFileInfo.getPath()); + StoreFile storeFile = createStoreFileAndReader(storeFileInfo); return storeFile; } }); @@ -592,6 +592,10 @@ private StoreFile createStoreFileAndReader(final Path p) throws IOException { StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p); + return createStoreFileAndReader(info); + } + + private StoreFile createStoreFileAndReader(final StoreFileInfo info) throws IOException { info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); StoreFile storeFile = new StoreFile(this.getFileSystem(), info, this.conf, this.cacheConf, this.family.getBloomFilterType()); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java (revision 1582543) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java (working copy) @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -35,7 +36,6 @@ import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.HalfStoreFileReader; import org.apache.hadoop.hbase.io.Reference; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.util.FSUtils; @@ -133,6 +133,22 @@ } /** + * Create a Store File Info from an HFileLink + * @param conf the {@link Configuration} to use + * @param fs The current file system to use. + * @param fileStatus The {@link FileStatus} of the file + */ + public StoreFileInfo(final Configuration conf, final FileSystem fs, final FileStatus fileStatus, + final HFileLink link) + throws IOException { + this.conf = conf; + this.fileStatus = fileStatus; + // HFileLink + this.reference = null; + this.link = link; + } + + /** * Sets the region coprocessor env. * @param coprocessorHost */ @@ -195,6 +211,8 @@ long length = status.getLen(); if (this.reference != null) { hdfsBlocksDistribution = computeRefFileHDFSBlockDistribution(fs, reference, status); + } else if (this.link != null) { + hdfsBlocksDistribution = computeHDFSBlocksDistribution(fs); } else { hdfsBlocksDistribution = FSUtils.computeHDFSBlocksDistribution(fs, status, 0, length); } @@ -226,8 +244,18 @@ FileStatus status; if (this.reference != null) { if (this.link != null) { - // HFileLink Reference - status = link.getFileStatus(fs); + FileNotFoundException exToThrow = null; + for (int i = 0; i < this.link.getLocations().length; i++) { + // HFileLink Reference + try { + status = link.getFileStatus(fs); + return computeRefFileHDFSBlockDistribution(fs, reference, status); + } catch (FileNotFoundException ex) { + // try the other location + exToThrow = ex; + } + } + throw exToThrow; } else { // HFile Reference Path referencePath = getReferredToFile(this.getPath()); @@ -236,8 +264,18 @@ return computeRefFileHDFSBlockDistribution(fs, reference, status); } else { if (this.link != null) { - // HFileLink - status = link.getFileStatus(fs); + FileNotFoundException exToThrow = null; + for (int i = 0; i < this.link.getLocations().length; i++) { + // HFileLink + try { + status = link.getFileStatus(fs); + return FSUtils.computeHDFSBlocksDistribution(fs, status, 0, status.getLen()); + } catch (FileNotFoundException ex) { + // try the other location + exToThrow = ex; + } + } + throw exToThrow; } else { status = this.fileStatus; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java (revision 1582543) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java (working copy) @@ -18,9 +18,16 @@ package org.apache.hadoop.hbase.util; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.StoreFileInfo; /** * Similar to {@link RegionReplicaUtil} but for the server side @@ -48,5 +55,39 @@ || !isDefaultReplica(region.getRegionInfo()); } + /** + * Returns whether to replay the recovered edits to flush the results. + * Currently secondary region replicas do not replay the edits, since it would + * cause flushes which might affect the primary region. Primary regions even opened + * in read only mode should replay the edits. + * @param region the HRegion object + * @return whether recovered edits should be replayed. + */ + public static boolean shouldReplayRecoveredEdits(HRegion region) { + return isDefaultReplica(region.getRegionInfo()); + } + /** + * Returns a StoreFileInfo from the given FileStatus. Secondary replicas refer to the + * files of the primary region, so an HFileLink is used to construct the StoreFileInfo. This + * way ensures that the secondary will be able to continue reading the store files even if + * they are moved to archive after compaction + * @throws IOException + */ + public static StoreFileInfo getStoreFileInfo(Configuration conf, FileSystem fs, + HRegionInfo regionInfo, HRegionInfo regionInfoForFs, String familyName, FileStatus status) + throws IOException { + + // if this is a primary region, just return the StoreFileInfo constructed from path + if (regionInfo.equals(regionInfoForFs)) { + return new StoreFileInfo(conf, fs, status); + } + + // else create a store file link. The link file does not exists on filesystem though. + HFileLink link = new HFileLink(conf, + HFileLink.createPath(regionInfoForFs.getTable(), regionInfoForFs.getEncodedName() + , familyName, status.getPath().getName())); + return new StoreFileInfo(conf, fs, status, link); + } + } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1582543) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -3928,6 +3928,7 @@ // create a primary region, load some data and flush // create a secondary region, and do a get against that Path rootDir = new Path(DIR + "testRegionReplicaSecondary"); + TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); byte[][] families = new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") @@ -3977,6 +3978,7 @@ // create a primary region, load some data and flush // create a secondary region, and do a put against that Path rootDir = new Path(DIR + "testRegionReplicaSecondary"); + TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); byte[][] families = new byte[][] { Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") @@ -4024,7 +4026,60 @@ HRegion.closeHRegion(secondaryRegion); } } + } + @Test + public void testCompactionFromPrimary() throws IOException { + Path rootDir = new Path(DIR + "testRegionReplicaSecondary"); + TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, rootDir.toString()); + + byte[][] families = new byte[][] { + Bytes.toBytes("cf1"), Bytes.toBytes("cf2"), Bytes.toBytes("cf3") + }; + byte[] cq = Bytes.toBytes("cq"); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testRegionReplicaSecondary")); + for (byte[] family : families) { + htd.addFamily(new HColumnDescriptor(family)); + } + + long time = System.currentTimeMillis(); + HRegionInfo primaryHri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + false, time, 0); + HRegionInfo secondaryHri = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + false, time, 1); + + HRegion primaryRegion = null, secondaryRegion = null; + + try { + primaryRegion = HRegion.createHRegion(primaryHri, + rootDir, TEST_UTIL.getConfiguration(), htd); + + // load some data + putData(primaryRegion, 0, 1000, cq, families); + + // flush region + primaryRegion.flushcache(); + + // open secondary region + secondaryRegion = HRegion.openHRegion(rootDir, secondaryHri, htd, null, conf); + + // move the file of the primary region to the archive, simulating a compaction + Collection storeFiles = primaryRegion.getStore(families[0]).getStorefiles(); + primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles); + Collection storeFileInfos = primaryRegion.getRegionFileSystem().getStoreFiles(families[0]); + Assert.assertTrue(storeFileInfos == null || storeFileInfos.size() == 0); + + verifyData(secondaryRegion, 0, 1000, cq, families); + } finally { + if (primaryRegion != null) { + HRegion.closeHRegion(primaryRegion); + } + if (secondaryRegion != null) { + HRegion.closeHRegion(secondaryRegion); + } + } } private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java (revision 1582543) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java (working copy) @@ -19,6 +19,12 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -29,6 +35,7 @@ import org.apache.hadoop.hbase.catalog.TestMetaReaderEditor; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -37,6 +44,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZKAssign; +import org.apache.hadoop.util.StringUtils; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -43,6 +51,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mortbay.log.Log; import com.google.protobuf.ServiceException; @@ -296,4 +305,126 @@ closeRegion(hriSecondary); } } + + @Test(timeout = 300000) + public void testFlushAndCompactionsInPrimary() throws Exception { + + long runtime = 30 * 1000; + // enable store file refreshing + final int refreshPeriod = 100; // 100ms refresh is a lot + HTU.getConfiguration().setInt("hbase.hstore.compactionThreshold", 3); + HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, refreshPeriod); + // restart the region server so that it starts the refresher chore + restartRegionServer(); + final int startKey = 0, endKey = 1000; + + try { + openRegion(hriSecondary); + + //load some data to primary so that reader won't fail + HTU.loadNumericRows(table, f, startKey, endKey); + HTU.getHBaseAdmin().flush(table.getTableName()); + // ensure that chore is run + Threads.sleep(2 * refreshPeriod); + + final AtomicBoolean running = new AtomicBoolean(true); + @SuppressWarnings("unchecked") + final AtomicReference[] exceptions = new AtomicReference[3]; + for (int i=0; i < exceptions.length; i++) { + exceptions[i] = new AtomicReference(); + } + + Runnable writer = new Runnable() { + int key = startKey; + @Override + public void run() { + try { + while (running.get()) { + byte[] data = Bytes.toBytes(String.valueOf(key)); + Put put = new Put(data); + put.add(f, null, data); + table.put(put); + key++; + if (key == endKey) key = startKey; + } + } catch (Exception ex) { + Log.warn(ex); + exceptions[0].compareAndSet(null, ex); + } + } + }; + + Runnable flusherCompactor = new Runnable() { + Random random = new Random(); + @Override + public void run() { + try { + while (running.get()) { + // flush or compact + if (random.nextBoolean()) { + HTU.getHBaseAdmin().flush(table.getTableName()); + } else { + HTU.compact(table.getName(), random.nextBoolean()); + } + } + } catch (Exception ex) { + Log.warn(ex); + exceptions[1].compareAndSet(null, ex); + } + } + }; + + Runnable reader = new Runnable() { + Random random = new Random(); + @Override + public void run() { + try { + while (running.get()) { + // whether to do a close and open + if (random.nextInt(10) == 0) { + try { + closeRegion(hriSecondary); + } catch (Exception ex) { + Log.warn("Failed closing the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); + exceptions[2].compareAndSet(null, ex); + } + try { + openRegion(hriSecondary); + } catch (Exception ex) { + Log.warn("Failed opening the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); + exceptions[2].compareAndSet(null, ex); + } + } + + int key = random.nextInt(endKey - startKey) + startKey; + assertGetRpc(hriSecondary, key, true); + } + } catch (Exception ex) { + Log.warn("Failed getting the value in the region " + hriSecondary + " " + StringUtils.stringifyException(ex)); + exceptions[2].compareAndSet(null, ex); + } + } + }; + + Log.info("Starting writer and reader"); + ExecutorService executor = Executors.newFixedThreadPool(3); + executor.submit(writer); + executor.submit(flusherCompactor); + executor.submit(reader); + + // wait for threads + Threads.sleep(runtime); + running.set(false); + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.SECONDS); + + for (AtomicReference exRef : exceptions) { + Assert.assertNull(exRef.get()); + } + + } finally { + HTU.deleteNumericRows(table, HConstants.CATALOG_FAMILY, startKey, endKey); + closeRegion(hriSecondary); + } + } } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (revision 1582543) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (working copy) @@ -81,6 +81,7 @@ import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.util.Progressable; import org.junit.experimental.categories.Category; +import org.junit.Test; import org.mockito.Mockito; import com.google.common.collect.Lists; @@ -917,6 +918,7 @@ store.getRegionFileSystem().removeStoreFiles(store.getColumnFamilyName(), Lists.newArrayList(sf)); } + @Test public void testRefreshStoreFiles() throws Exception { init(this.getName()); @@ -963,6 +965,7 @@ } @SuppressWarnings("unchecked") + @Test public void testRefreshStoreFilesNotChanged() throws IOException { init(this.getName()); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java (revision 1582543) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFileRefresherChore.java (working copy) @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.StoppableImplementation; import org.junit.Assert; import org.junit.Before; @@ -62,6 +63,7 @@ public void setUp() { TEST_UTIL = new HBaseTestingUtility(); testDir = TEST_UTIL.getDataTestDir("TestStoreFileRefresherChore"); + TEST_UTIL.getConfiguration().set(HConstants.HBASE_DIR, testDir.toString()); } private HTableDescriptor getTableDesc(TableName tableName, byte[]... families) { @@ -92,7 +94,7 @@ private HRegion initHRegion(HTableDescriptor htd, byte[] startKey, byte[] stopKey, int replicaId) throws IOException { Configuration conf = TEST_UTIL.getConfiguration(); - Path tableDir = new Path(testDir, htd.getTableName().getNameAsString()); + Path tableDir = FSUtils.getTableDir(testDir, htd.getTableName()); HRegionInfo info = new HRegionInfo(htd.getTableName(), startKey, stopKey, false, 0, replicaId);