diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 484d5ee..2cb713f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3950,7 +3950,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long seqid = minSeqIdForTheRegion; FileSystem fs = this.fs.getFileSystem(); - NavigableSet files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); + List files = + WALSplitter.getSplitEditFilesSorted(fs, regiondir, conf); if (LOG.isDebugEnabled()) { LOG.debug("Found " + (files == null ? 0 : files.size()) + " recovered edits file(s) under " + regiondir); @@ -3958,16 +3959,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (files == null || files.isEmpty()) return seqid; - for (Path edits: files) { - if (edits == null || !fs.exists(edits)) { - LOG.warn("Null or non-existent edits file: " + edits); - continue; - } + long lastMaxSeqId = -1; + for (WALSplitter.RecoveryFileContext file: files) { + Path edits = file.getPath(); + long maxSeqId = file.getMaxSeqId(); if (isZeroLengthThenDelete(fs, edits)) continue; - - long maxSeqId; - String fileName = edits.getName(); - maxSeqId = Math.abs(Long.parseLong(fileName)); if (maxSeqId <= minSeqIdForTheRegion) { if (LOG.isDebugEnabled()) { String msg = "Maximum sequenceid for this wal is " + maxSeqId @@ -3977,11 +3973,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } continue; } - + if (maxSeqId <= lastMaxSeqId) { + LOG.info("skip current WAL because maxSeqId less than last WAL's"); + continue; + } try { // replay the edits. Replay can return -1 if everything is skipped, only update // if seqId is greater - seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter)); + seqid = Math.max(seqid, + replayRecoveredEdits(file, maxSeqIdInStores, reporter, lastMaxSeqId)); + lastMaxSeqId = maxSeqId; } catch (IOException e) { boolean skipErrors = conf.getBoolean( HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, @@ -4019,14 +4020,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // column family. Have to fake out file type too by casting our recovered.edits as storefiles String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regiondir).getName(); Set fakeStoreFiles = new HashSet(files.size()); - for (Path file: files) { - fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), file, this.conf, - null, null)); + for (WALSplitter.RecoveryFileContext file: files) { + fakeStoreFiles.add(new StoreFile(getRegionFileSystem().getFileSystem(), + file.getPath(), this.conf, null, null)); } getRegionFileSystem().removeStoreFiles(fakeFamilyName, fakeStoreFiles); } else { - for (Path file: files) { - if (!fs.delete(file, false)) { + for (WALSplitter.RecoveryFileContext file: files) { + if (!fs.delete(file.getPath(), false)) { LOG.error("Failed delete of " + file); } else { LOG.debug("Deleted recovered.edits file=" + file); @@ -4045,18 +4046,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * recovered edits log or minSeqId if nothing added from editlogs. * @throws IOException */ - private long replayRecoveredEdits(final Path edits, - Map maxSeqIdInStores, final CancelableProgressable reporter) + private long replayRecoveredEdits(final WALSplitter.RecoveryFileContext file, + Map maxSeqIdInStores, final CancelableProgressable reporter, long lastMaxSeqId) throws IOException { + Path edits = file.getPath(); String msg = "Replaying edits from " + edits; LOG.info(msg); MonitoredTask status = TaskMonitor.get().createStatus(msg); FileSystem fs = this.fs.getFileSystem(); status.setStatus("Opening recovered edits"); - WAL.Reader reader = null; + WAL.Reader reader = file.getReader(); + reader.reset(); try { - reader = WALFactory.createReader(fs, edits, conf); long currentEditSeqId = -1; long currentReplaySeqId = -1; long firstSeqIdInLog = -1; @@ -4078,7 +4080,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi while ((entry = reader.next()) != null) { WALKey key = entry.getKey(); WALEdit val = entry.getEdit(); - + if (key.getLogSeqNum() <= lastMaxSeqId) { + skippedEdits++; + continue; + } if (ng != null) { // some test, or nonces disabled ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime()); } @@ -5043,7 +5048,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @throws IOException */ private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p) - throws IOException { + throws IOException { FileStatus stat = fs.getFileStatus(p); if (stat.getLen() > 0) return false; LOG.warn("File " + p + " is zero-length, deleting."); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 04438fd..40184b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -23,17 +23,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.text.ParseException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.UUID; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; @@ -596,12 +586,12 @@ public class WALSplitter { * @return Files in passed regiondir as a sorted set. * @throws IOException */ - public static NavigableSet getSplitEditFilesSorted(final FileSystem fs, - final Path regiondir) throws IOException { - NavigableSet filesSorted = new TreeSet(); + public static List getSplitEditFilesSorted(final FileSystem fs, + final Path regiondir, Configuration conf) throws IOException { + List results = new ArrayList(); Path editsdir = getRegionDirRecoveredEditsDir(regiondir); if (!fs.exists(editsdir)) - return filesSorted; + return results; FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { @Override public boolean accept(Path p) { @@ -629,12 +619,68 @@ public class WALSplitter { } }); if (files == null) { - return filesSorted; + return results; } for (FileStatus status : files) { - filesSorted.add(status.getPath()); + if (status.getPath() == null || !fs.exists(status.getPath())) { + LOG.warn("Null or non-existent edits file: " + status.getPath()); + continue; + } + long minSeqId = -1; + WAL.Reader reader = null; + if (fs.getFileStatus(status.getPath()).getLen() > 0) { + reader = WALFactory.createReader(fs, status.getPath(), conf); + WAL.Entry firstEntry = reader.next(); + minSeqId = firstEntry.getKey().getLogSeqNum(); + } + RecoveryFileContext recoveryFileContext = new RecoveryFileContext(status.getPath(), minSeqId, + extractMaxSeqId(status.getPath().getName()), reader); + results.add(recoveryFileContext); + } + + Collections.sort(results, new Comparator() { + @Override + public int compare(RecoveryFileContext o1, RecoveryFileContext o2) { + return Long.compare(o1.getMinSeqId(), o2.getMinSeqId()); + } + }); + + return results; + } + + + public static long extractMaxSeqId(String filename) { + return Math.abs(Long.parseLong(filename)); + } + + public static class RecoveryFileContext { + private Path path; + private long minSeqId; + private long maxSeqId; + private Reader reader; + + public RecoveryFileContext(Path path, long minSeqId, long maxSeqId, Reader reader) { + this.path = path; + this.minSeqId = minSeqId; + this.maxSeqId = maxSeqId; + this.reader = reader; + } + + public Path getPath() { + return path; + } + + public long getMinSeqId() { + return minSeqId; + } + + public long getMaxSeqId() { + return maxSeqId; + } + + public Reader getReader() { + return reader; } - return filesSorted; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 35de488..b8d003c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -837,6 +837,92 @@ public class TestHRegion { } } + + private void createRecoveryEdits(FileSystem fs, WALFactory wals, Path recoveredEditsDir, + byte[] row, byte[] family, byte[] regionName, + long minSeqId, long maxSeqId) throws IOException { + String filename = String.format("%019d", maxSeqId); + Path recoveredEdits = new Path(recoveredEditsDir, filename); + fs.create(recoveredEdits); + WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + for (long i = minSeqId; i <= maxSeqId; i++) { + long time = System.nanoTime(); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes + .toBytes(i))); + writer.append(new WAL.Entry(new HLogKey(regionName, tableName, i, time, + HConstants.DEFAULT_CLUSTER_ID), edit)); + } + writer.close(); + } + + private void deleteWAL(FileSystem fs,Path recoveredEditsDir, long minSeqId, + long maxSeqId) throws IOException { + String filename = String.format("%019d", maxSeqId); + fs.delete(new Path(recoveredEditsDir, filename), false); + } + + private void validateWAL(FileSystem fs, Path regiondir, long minSeqId, long maxSeqId, + byte[] row, byte[] family) + throws IOException { + Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); + FileStatus[] files = FSUtils.listStatus(fs, recoveredEditsDir); + for (FileStatus file: files) { + LOG.info(file.getPath()); + } + + MonitoredTask status = TaskMonitor.get().createStatus(method); + Map maxSeqIdInStores = new TreeMap(Bytes.BYTES_COMPARATOR); + for (Store store : region.getStores()) { + maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId - 1); + } + long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + assertEquals(maxSeqId, seqId); + region.getMVCC().advanceTo(seqId); + Get get = new Get(row); + Result result = region.get(get); + for (long i = minSeqId; i <= maxSeqId; i += 10) { + List kvs = result.getColumnCells(family, Bytes.toBytes(i)); + assertEquals(1, kvs.size()); + assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0))); + } + } + + @Test + public void testSkipDuplicateEditsReplay() throws Exception { + String method = "testSkipDuplicateEditsReplay"; + TableName tableName = TableName.valueOf(method); + byte[] family = Bytes.toBytes("family"); + this.region = initHRegion(tableName, method, CONF, family); + final WALFactory wals = new WALFactory(CONF, null, method); + try { + Path regiondir = region.getRegionFileSystem().getRegionDir(); + FileSystem fs = region.getRegionFileSystem().getFileSystem(); + byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); + + Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); + + long minSeqId = 1000; + createRecoveryEdits(fs, wals, recoveredEditsDir, row, family, regionName, minSeqId, + minSeqId + 10); + createRecoveryEdits(fs, wals, recoveredEditsDir, row, family, regionName, minSeqId + 11, + minSeqId + 20); + createRecoveryEdits(fs, wals, recoveredEditsDir, row, family, regionName, minSeqId + 21, + minSeqId + 30); + createRecoveryEdits(fs, wals, recoveredEditsDir, row, family, regionName, minSeqId + 21, + minSeqId + 25); + FileStatus[] files = FSUtils.listStatus(fs, recoveredEditsDir); + for (FileStatus file: files) { + LOG.info(file.getPath()); + } + validateWAL(fs, regiondir, minSeqId, minSeqId + 30, row, family); + } finally { + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + wals.close(); + } + } + @Test public void testRecoveredEditsReplayCompaction() throws Exception { String method = name.getMethodName(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java index bfe5d5e..7b02ed9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.wal; import static org.junit.Assert.*; import java.io.IOException; -import java.util.NavigableSet; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -82,20 +82,20 @@ public class TestWALMethods { FSUtils.setRootDir(walConf, regiondir); (new WALFactory(walConf, null, "dummyLogName")).getWAL(new byte[] {}, null); - NavigableSet files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); + List files = WALSplitter.getSplitEditFilesSorted(fs, regiondir, walConf); assertEquals(7, files.size()); - assertEquals(files.pollFirst().getName(), first); - assertEquals(files.pollLast().getName(), last); - assertEquals(files.pollFirst().getName(), + assertEquals(files.get(0).getPath().getName(), first); + assertEquals(files.get(files.size() - 1).getPath().getName(), last); + assertEquals(files.get(1).getPath().getName(), WALSplitter .formatRecoveredEditsFileName(0)); - assertEquals(files.pollFirst().getName(), + assertEquals(files.get(2).getPath().getName(), WALSplitter .formatRecoveredEditsFileName(1)); - assertEquals(files.pollFirst().getName(), + assertEquals(files.get(3).getPath().getName(), WALSplitter .formatRecoveredEditsFileName(2)); - assertEquals(files.pollFirst().getName(), + assertEquals(files.get(4).getPath().getName(), WALSplitter .formatRecoveredEditsFileName(11)); }