diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d5c07f0..faa8300 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -775,13 +775,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // this.lastFlushTime = EnvironmentEdgeManager.currentTime(); // Use maximum of wal sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). - long nextSeqid = maxSeqId + 1; - if (this.isRecovering) { - // In distributedLogReplay mode, we don't know the last change sequence number because region - // is opened before recovery completes. So we add a safety bumper to avoid new sequence number - // overlaps used sequence numbers - nextSeqid = WALSplitter.writeRegionOpenSequenceIdFile(this.fs.getFileSystem(), - this.fs.getRegionDir(), nextSeqid, (this.flushPerChanges + 10000000)); + long nextSeqid = maxSeqId; + + // In distributedLogReplay mode, we don't know the last change sequence number because region + // is opened before recovery completes. So we add a safety bumper to avoid new sequence number + // overlaps used sequence numbers + if (this.writestate.writesEnabled) { + nextSeqid = WALSplitter.writeRegionOpenSequenceIdFile(this.fs.getFileSystem(), this.fs + .getRegionDir(), nextSeqid, (this.isRecovering ? (this.flushPerChanges + 10000000) : 1)); + } else { + nextSeqid++; } LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() + @@ -909,6 +912,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // getRegionServerServices().getServerName(), storeFiles); WALUtil.writeRegionEventMarker(wal, getTableDesc(), getRegionInfo(), regionEventDesc, getSequenceId()); + + // Store SeqId in HDFS when a region closes + // checking region folder exists is due to many tests which delete the table folder while a + // table is still online + if (this.fs.getFileSystem().exists(this.fs.getRegionDir())) { + WALSplitter.writeRegionOpenSequenceIdFile(this.fs.getFileSystem(), this.fs.getRegionDir(), + getSequenceId().get(), 0); + } } /** @@ -1290,7 +1301,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } status.setStatus("Writing region close event to WAL"); - if (!abort && wal != null && getRegionServerServices() != null) { + if (!abort && wal != null && getRegionServerServices() != null && !writestate.readOnly) { writeRegionCloseMarker(wal); } @@ -3530,7 +3541,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (firstSeqIdInLog == -1) { firstSeqIdInLog = key.getLogSeqNum(); } - currentEditSeqId = key.getLogSeqNum(); + if (currentEditSeqId > key.getLogSeqNum()) { + // when this condition is true, it means we have a serious defect because we need to + // maintain increasing SeqId for WAL edits per region + LOG.warn("Found decreasing SeqId. PreId=" + currentEditSeqId + " key=" + key + + "; edit=" + val); + } else { + currentEditSeqId = key.getLogSeqNum(); + } currentReplaySeqId = (key.getOrigLogSeqNum() > 0) ? key.getOrigLogSeqNum() : currentEditSeqId; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index cc9f898..50532a1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -1508,6 +1508,9 @@ public abstract class FSUtils { FileStatus[] familyDirs = fs.listStatus(dd, familyFilter); for (FileStatus familyDir : familyDirs) { Path family = familyDir.getPath(); + if (family.getName().equals(HConstants.RECOVERED_EDITS_DIR)) { + continue; + } // now in family, iterate over the StoreFiles and // put in map FileStatus[] familyStatus = fs.listStatus(family); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 3f381cc..ee3778b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -57,6 +57,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -124,6 +125,7 @@ import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.regionserver.wal.WALUtil; /** * This class is responsible for splitting up a bunch of regionserver commit log @@ -613,6 +615,10 @@ public class WALSplitter { if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { result = false; } + // Skip SeqId Files + if (p.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)) { + result = false; + } } catch (IOException e) { LOG.warn("Failed isFile check on " + p); } @@ -659,7 +665,6 @@ public class WALSplitter { /** * Create a file with name as region open sequence id - * * @param fs * @param regiondir * @param newSeqId @@ -670,14 +675,17 @@ public class WALSplitter { public static long writeRegionOpenSequenceIdFile(final FileSystem fs, final Path regiondir, long newSeqId, long saftyBumper) throws IOException { - Path editsdir = getRegionDirRecoveredEditsDir(regiondir); + Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); long maxSeqId = 0; FileStatus[] files = null; if (fs.exists(editsdir)) { files = FSUtils.listStatus(fs, editsdir, new PathFilter() { @Override public boolean accept(Path p) { - return isSequenceIdFile(p); + if (p.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)) { + return true; + } + return false; } }); if (files != null) { @@ -685,7 +693,7 @@ public class WALSplitter { String fileName = status.getPath().getName(); try { Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length() - - SEQUENCE_ID_FILE_SUFFIX.length())); + - SEQUENCE_ID_FILE_SUFFIX.length())); maxSeqId = Math.max(tmpSeqId, maxSeqId); } catch (NumberFormatException ex) { LOG.warn("Invalid SeqId File Name=" + fileName); @@ -697,15 +705,28 @@ public class WALSplitter { newSeqId = maxSeqId; } newSeqId += saftyBumper; // bump up SeqId - + // write a new seqId file Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX); - if (!fs.createNewFile(newSeqIdFile)) { - throw new IOException("Failed to create SeqId file:" + newSeqIdFile); + if (newSeqId != maxSeqId) { + try { + if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) { + throw new IOException("Failed to create SeqId file:" + newSeqIdFile); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Written region seqId to file:" + newSeqIdFile + " ,newSeqId=" + newSeqId + + " ,maxSeqId=" + maxSeqId); + } + } catch (FileAlreadyExistsException ignored) { + // latest hdfs throws this exception. it's all right if newSeqIdFile already exists + } } // remove old ones - if(files != null) { + if (files != null) { for (FileStatus status : files) { + if (newSeqIdFile.equals(status.getPath())) { + continue; + } fs.delete(status.getPath(), false); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java index cbc97e1..8af6016 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestHFileArchiving.java @@ -145,7 +145,16 @@ public class TestHFileArchiving { assertTrue(fs.exists(archiveDir)); // check to make sure the store directory was copied - FileStatus[] stores = fs.listStatus(archiveDir); + // check to make sure the store directory was copied + FileStatus[] stores = fs.listStatus(archiveDir, new PathFilter() { + @Override + public boolean accept(Path p) { + if (p.getName().contains(HConstants.RECOVERED_EDITS_DIR)) { + return false; + } + return true; + } + }); assertTrue(stores.length == 1); // make sure we archived the store files @@ -413,7 +422,15 @@ public class TestHFileArchiving { * @throws IOException */ private List getAllFileNames(final FileSystem fs, Path archiveDir) throws IOException { - FileStatus[] files = FSUtils.listStatus(fs, archiveDir, null); + FileStatus[] files = FSUtils.listStatus(fs, archiveDir, new PathFilter() { + @Override + public boolean accept(Path p) { + if (p.getName().contains(HConstants.RECOVERED_EDITS_DIR)) { + return false; + } + return true; + } + }); return recurseOnFiles(fs, files, new ArrayList()); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index f37c1eb..2b2c525 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -261,7 +261,15 @@ public class TestDistributedLogSplitting { Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); LOG.debug("checking edits dir " + editsdir); - FileStatus[] files = fs.listStatus(editsdir); + FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + if (WALSplitter.isSequenceIdFile(p)) { + return false; + } + return true; + } + }); assertTrue("edits dir should have more than a single file in it. instead has " + files.length, files.length > 1); for (int i = 0; i < files.length; i++) { @@ -842,7 +850,15 @@ public class TestDistributedLogSplitting { WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, hri.getEncodedName())); LOG.debug("checking edits dir " + editsdir); if(!fs.exists(editsdir)) continue; - FileStatus[] files = fs.listStatus(editsdir); + FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + if (WALSplitter.isSequenceIdFile(p)) { + return false; + } + return true; + } + }); if(files != null) { for(FileStatus file : files) { int c = countWAL(file.getPath(), fs, conf); @@ -1385,11 +1401,10 @@ public class TestDistributedLogSplitting { FileSystem fs = master.getMasterFileSystem().getFileSystem(); Path tableDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), TableName.valueOf("table")); List regionDirs = FSUtils.getRegionDirs(fs, tableDir); + long newSeqId = WALSplitter.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0), 1L, 1000L); WALSplitter.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L); - // current SeqId file has seqid=1001 - WALSplitter.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0) , 1L, 1000L); - // current SeqId file has seqid=2001 - assertEquals(3001, WALSplitter.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L)); + assertEquals(newSeqId + 2000, + WALSplitter.writeRegionOpenSequenceIdFile(fs, regionDirs.get(0), 3L, 1000L)); Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regionDirs.get(0)); FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/handler/TestTableDeleteFamilyHandler.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/handler/TestTableDeleteFamilyHandler.java index f56c7a6..ce6abda 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/handler/TestTableDeleteFamilyHandler.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/handler/TestTableDeleteFamilyHandler.java @@ -28,6 +28,8 @@ import java.io.IOException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WALSplitter; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -122,7 +125,15 @@ public class TestTableDeleteFamilyHandler { FileStatus[] fileStatus = fs.listStatus(tableDir); for (int i = 0; i < fileStatus.length; i++) { if (fileStatus[i].isDirectory() == true) { - FileStatus[] cf = fs.listStatus(fileStatus[i].getPath()); + FileStatus[] cf = fs.listStatus(fileStatus[i].getPath(), new PathFilter() { + @Override + public boolean accept(Path p) { + if (p.getName().contains(HConstants.RECOVERED_EDITS_DIR)) { + return false; + } + return true; + } + }); int k = 1; for (int j = 0; j < cf.length; j++) { if (cf[j].isDirectory() == true @@ -149,7 +160,15 @@ public class TestTableDeleteFamilyHandler { fileStatus = fs.listStatus(tableDir); for (int i = 0; i < fileStatus.length; i++) { if (fileStatus[i].isDirectory() == true) { - FileStatus[] cf = fs.listStatus(fileStatus[i].getPath()); + FileStatus[] cf = fs.listStatus(fileStatus[i].getPath(), new PathFilter() { + @Override + public boolean accept(Path p) { + if (WALSplitter.isSequenceIdFile(p)) { + return false; + } + return true; + } + }); for (int j = 0; j < cf.length; j++) { if (cf[j].isDirectory() == true) { assertFalse(cf[j].getPath().getName().equals("cf2")); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index f3651ae..f3f2ebe 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -900,8 +901,16 @@ public class TestWALReplay { WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null, mode, wals); FileStatus[] listStatus1 = this.fs.listStatus( - new Path(FSUtils.getTableDir(hbaseRootDir, tableName), - new Path(hri.getEncodedName(), "recovered.edits"))); + new Path(FSUtils.getTableDir(hbaseRootDir, tableName), new Path(hri.getEncodedName(), + "recovered.edits")), new PathFilter() { + @Override + public boolean accept(Path p) { + if (WALSplitter.isSequenceIdFile(p)) { + return false; + } + return true; + } + }); int editCount = 0; for (FileStatus fileStatus : listStatus1) { editCount = Integer.parseInt(fileStatus.getPath().getName()); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index 2b64b97..e263cdb 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -55,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -1163,7 +1164,15 @@ public class TestWALSplit { @SuppressWarnings("deprecation") Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, Bytes.toString(region.getBytes()))); - FileStatus [] files = this.fs.listStatus(editsdir); + FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + if (WALSplitter.isSequenceIdFile(p)) { + return false; + } + return true; + } + }); Path[] paths = new Path[files.length]; for (int i = 0; i < files.length; i++) { paths[i] = files[i].getPath();