From 1a183b17a0bd356a7506fcae34800780251592cc Mon Sep 17 00:00:00 2001 From: chenheng Date: Tue, 8 Dec 2015 20:36:27 +0800 Subject: [PATCH] HBASE-14949 --- .../apache/hadoop/hbase/regionserver/HRegion.java | 21 ++-- .../org/apache/hadoop/hbase/wal/WALSplitter.java | 108 ++++++++++++++++++++- .../hadoop/hbase/regionserver/TestHRegion.java | 94 ++++++++++++++++++ .../apache/hadoop/hbase/wal/TestWALMethods.java | 15 +-- 4 files changed, 220 insertions(+), 18 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 484d5ee..8e0cb90 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3950,7 +3950,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long seqid = minSeqIdForTheRegion; FileSystem fs = this.fs.getFileSystem(); - NavigableSet files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); + List files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); if (LOG.isDebugEnabled()) { LOG.debug("Found " + (files == null ? 0 : files.size()) + " recovered edits file(s) under " + regiondir); @@ -3958,6 +3958,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (files == null || files.isEmpty()) return seqid; + long lastMaxSeqId = -1; for (Path edits: files) { if (edits == null || !fs.exists(edits)) { LOG.warn("Null or non-existent edits file: " + edits); @@ -3967,7 +3968,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long maxSeqId; String fileName = edits.getName(); - maxSeqId = Math.abs(Long.parseLong(fileName)); + maxSeqId = WALSplitter.extractMaxSeqId(fileName); if (maxSeqId <= minSeqIdForTheRegion) { if (LOG.isDebugEnabled()) { String msg = "Maximum sequenceid for this wal is " + maxSeqId @@ -3977,11 +3978,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } continue; } - + if (maxSeqId <= lastMaxSeqId) { + LOG.info("skip current WAL because maxSeqId less than last WAL's"); + continue; + } try { // replay the edits. Replay can return -1 if everything is skipped, only update // if seqId is greater - seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter)); + seqid = Math.max(seqid, + replayRecoveredEdits(edits, maxSeqIdInStores, reporter, lastMaxSeqId)); + lastMaxSeqId = maxSeqId; } catch (IOException e) { boolean skipErrors = conf.getBoolean( HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, @@ -4046,7 +4052,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @throws IOException */ private long replayRecoveredEdits(final Path edits, - Map maxSeqIdInStores, final CancelableProgressable reporter) + Map maxSeqIdInStores, final CancelableProgressable reporter, long lastMaxSeqId) throws IOException { String msg = "Replaying edits from " + edits; LOG.info(msg); @@ -4078,7 +4084,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi while ((entry = reader.next()) != null) { WALKey key = entry.getKey(); WALEdit val = entry.getEdit(); - + if (key.getLogSeqNum() <= lastMaxSeqId) { + skippedEdits++; + continue; + } if (ng != null) { // some test, or nonces disabled ng.reportOperationFromWal(key.getNonceGroup(), key.getNonce(), key.getWriteTime()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 04438fd..dc4ae30 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -574,7 +574,10 @@ public class WALSplitter { return String.format("%019d", seqid); } - private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); + private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+$"); + public static final String DUP_LOG_SUFFIX = ".dup"; + private static final Pattern EDITFILES_DUP_NAME_PATTERN = + Pattern.compile("-?[0-9]+_[0-9]+" + DUP_LOG_SUFFIX); private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; /** @@ -588,6 +591,30 @@ public class WALSplitter { } /** + * Extract id from WAL filename + * */ + public static long extractMaxSeqId(String filename) { + long maxSeqId = -1; + if (filename.endsWith(DUP_LOG_SUFFIX)) { + maxSeqId = Math.abs(Long.parseLong( + filename.substring(filename.indexOf("_") + 1, filename.indexOf(WALSplitter.DUP_LOG_SUFFIX)))); + } else { + maxSeqId = Math.abs(Long.parseLong(filename)); + } + return maxSeqId; + } + + private static long extractMinSeqId(String filename) { + if (filename.endsWith(DUP_LOG_SUFFIX)) { + return Math.abs(Long.parseLong(filename.substring(0, filename.indexOf("_")))); + } + return -1; + } + + public static boolean isDupFile(String filename) { + return filename.endsWith(DUP_LOG_SUFFIX); + } + /** * Returns sorted set of edit files made by splitter, excluding files * with '.temp' suffix. * @@ -596,12 +623,12 @@ public class WALSplitter { * @return Files in passed regiondir as a sorted set. * @throws IOException */ - public static NavigableSet getSplitEditFilesSorted(final FileSystem fs, + public static List getSplitEditFilesSorted(final FileSystem fs, final Path regiondir) throws IOException { NavigableSet filesSorted = new TreeSet(); Path editsdir = getRegionDirRecoveredEditsDir(regiondir); if (!fs.exists(editsdir)) - return filesSorted; + return new ArrayList(filesSorted); FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { @Override public boolean accept(Path p) { @@ -629,14 +656,85 @@ public class WALSplitter { } }); if (files == null) { - return filesSorted; + return new ArrayList(filesSorted); } for (FileStatus status : files) { filesSorted.add(status.getPath()); } - return filesSorted; + + NavigableSet dupFilesSorted = new TreeSet(); + files = FSUtils.listStatus(fs, editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + boolean result = false; + try { + // Return files and only files that match the editfile names pattern. + // There can be other files in this directory other than edit files. + // In particular, on error, we'll move aside the bad edit file giving + // it a timestamp suffix. See moveAsideBadEditsFile. + Matcher m = EDITFILES_DUP_NAME_PATTERN.matcher(p.getName()); + result = fs.isFile(p) && m.matches(); + // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, + // because it means splitwal thread is writting this file. + if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { + result = false; + } + // Skip SeqId Files + if (isSequenceIdFile(p)) { + result = false; + } + } catch (IOException e) { + LOG.warn("Failed isFile check on " + p); + } + return result; + } + }); + if (files == null) { + return new ArrayList(filesSorted); + } + for (FileStatus status : files) { + dupFilesSorted.add(status.getPath()); + } + + List merge = new LinkedList(filesSorted); + + for (Path dup : dupFilesSorted) { + insertInto(dup, merge); + } + + return merge; } + private static void insertInto(Path dup, List wals) { + long minSeqDupId = extractMinSeqId(dup.getName()); + long maxSeqDupId = extractMaxSeqId(dup.getName()); + long lastMaxId = -1; + for (int i = 0, n = wals.size(); i < n; i++) { + String filename = wals.get(i).getName(); + long maxId = extractMaxSeqId(filename); + if (isDupFile(filename)) { + lastMaxId = maxId; + continue; + } + if (hasOverLap(lastMaxId + 1, maxId, minSeqDupId, maxSeqDupId)) { + wals.add(i + 1, dup); + return; + } + lastMaxId = maxId; + } + } + + private static boolean hasOverLap(long minId, long maxId, long minDupId, long maxDupId) { + if (minDupId >= minId && minDupId <= maxId) { + return true; + } + if (minDupId == maxId + 1) { + return true; + } + return false; + } + + /** * Move aside a bad edits file. * diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 35de488..832905f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -837,6 +837,100 @@ public class TestHRegion { } } + private String generateName(long minSeqId, long maxSeqId) { + return String.format("%019d_%019d", minSeqId, maxSeqId) + WALSplitter.DUP_LOG_SUFFIX; + } + private void createWAL(FileSystem fs, WALFactory wals, Path recoveredEditsDir, + byte[] row, byte[] family, byte[] regionName, + long minSeqId, long maxSeqId, boolean isDup) throws IOException { + String filename = String.format("%019d", maxSeqId); + if (isDup) { + filename = generateName(minSeqId, maxSeqId); + } + Path recoveredEdits = new Path(recoveredEditsDir, filename); + fs.create(recoveredEdits); + WALProvider.Writer writer = wals.createRecoveredEditsWriter(fs, recoveredEdits); + for (long i = minSeqId; i <= maxSeqId; i++) { + long time = System.nanoTime(); + WALEdit edit = new WALEdit(); + edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes + .toBytes(i))); + writer.append(new WAL.Entry(new HLogKey(regionName, tableName, i, time, + HConstants.DEFAULT_CLUSTER_ID), edit)); + } + writer.close(); + } + + private void deleteWAL(FileSystem fs,Path recoveredEditsDir, long minSeqId, + long maxSeqId) throws IOException { + String filename = String.format("%019d", maxSeqId); + fs.delete(new Path(recoveredEditsDir, filename), false); + filename = generateName(minSeqId, maxSeqId); + fs.delete(new Path(recoveredEditsDir, filename), false); + } + + private void validateWAL(FileSystem fs, Path regiondir, long minSeqId, long maxSeqId, + byte[] row, byte[] family) + throws IOException { + Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); + FileStatus[] files = FSUtils.listStatus(fs, recoveredEditsDir); + for (FileStatus file: files) { + LOG.info(file.getPath()); + } + + MonitoredTask status = TaskMonitor.get().createStatus(method); + Map maxSeqIdInStores = new TreeMap(Bytes.BYTES_COMPARATOR); + for (Store store : region.getStores()) { + maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), minSeqId - 1); + } + long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + assertEquals(maxSeqId, seqId); + region.getMVCC().advanceTo(seqId); + Get get = new Get(row); + Result result = region.get(get); + for (long i = minSeqId; i <= maxSeqId; i += 10) { + List kvs = result.getColumnCells(family, Bytes.toBytes(i)); + assertEquals(1, kvs.size()); + assertArrayEquals(Bytes.toBytes(i), CellUtil.cloneValue(kvs.get(0))); + } + } + + @Test + public void testSkipDuplicateEditsReplay() throws Exception { + String method = "testSkipDuplicateEditsReplay"; + TableName tableName = TableName.valueOf(method); + byte[] family = Bytes.toBytes("family"); + this.region = initHRegion(tableName, method, CONF, family); + final WALFactory wals = new WALFactory(CONF, null, method); + try { + Path regiondir = region.getRegionFileSystem().getRegionDir(); + FileSystem fs = region.getRegionFileSystem().getFileSystem(); + byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); + + Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); + + long minSeqId = 1000; + // wals list is 00010, 00011_00020.dup, 00030, 00021_00025.dup + createWAL(fs, wals, recoveredEditsDir, row, family, regionName, minSeqId, + minSeqId + 10, false); + createWAL(fs, wals, recoveredEditsDir, row, family, regionName, minSeqId + 11, + minSeqId + 20, true); + createWAL(fs, wals, recoveredEditsDir, row, family, regionName, minSeqId + 21, + minSeqId + 30, false); + createWAL(fs, wals, recoveredEditsDir, row, family, regionName, minSeqId + 21, + minSeqId + 25, true); + FileStatus[] files = FSUtils.listStatus(fs, recoveredEditsDir); + for (FileStatus file: files) { + LOG.info(file.getPath()); + } + validateWAL(fs, regiondir, minSeqId, minSeqId + 30, row, family); + } finally { + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + wals.close(); + } + } + @Test public void testRecoveredEditsReplayCompaction() throws Exception { String method = name.getMethodName(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java index bfe5d5e..f68facf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.wal; import static org.junit.Assert.*; import java.io.IOException; +import java.util.List; import java.util.NavigableSet; import org.apache.hadoop.conf.Configuration; @@ -82,20 +83,20 @@ public class TestWALMethods { FSUtils.setRootDir(walConf, regiondir); (new WALFactory(walConf, null, "dummyLogName")).getWAL(new byte[] {}, null); - NavigableSet files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); + List files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); assertEquals(7, files.size()); - assertEquals(files.pollFirst().getName(), first); - assertEquals(files.pollLast().getName(), last); - assertEquals(files.pollFirst().getName(), + assertEquals(files.get(0).getName(), first); + assertEquals(files.get(files.size() - 1).getName(), last); + assertEquals(files.get(1).getName(), WALSplitter .formatRecoveredEditsFileName(0)); - assertEquals(files.pollFirst().getName(), + assertEquals(files.get(2).getName(), WALSplitter .formatRecoveredEditsFileName(1)); - assertEquals(files.pollFirst().getName(), + assertEquals(files.get(3).getName(), WALSplitter .formatRecoveredEditsFileName(2)); - assertEquals(files.pollFirst().getName(), + assertEquals(files.get(4).getName(), WALSplitter .formatRecoveredEditsFileName(11)); } -- 1.9.3 (Apple Git-50)