From 5a750fe69889f960b6d92fecfc6d41efe4d1ec29 Mon Sep 17 00:00:00 2001 From: chenheng Date: Fri, 11 Dec 2015 19:24:12 +0800 Subject: [PATCH] HBASE-14949 Skip duplicate entries when replay WAL --- .../org/apache/hadoop/hbase/wal/WALSplitter.java | 39 ++++++-- .../apache/hadoop/hbase/wal/TestWALFactory.java | 102 +++++++++++++++++++++ 2 files changed, 135 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 04438fd..590e940 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -1350,30 +1350,57 @@ public class WALSplitter { Path dst = getCompletedRecoveredEditsFilePath(wap.p, regionMaximumEditLogSeqNum.get(writersEntry.getKey())); try { + boolean shouldFlush = true; + System.out.println("@@@" + dst); if (!dst.equals(wap.p) && fs.exists(dst)) { - LOG.warn("Found existing old edits file. It could be the " + //We compare minSeqId with same file's minSeqId. + long minSeqIdInSameFile = -1; + if (fs.getFileStatus(dst).getLen() > 0) { + WAL.Reader reader = null; + try { + reader = WALFactory.createReader(fs, dst, conf); + WAL.Entry firstEntry = reader.next(); + minSeqIdInSameFile = firstEntry.getKey().getLogSeqNum(); + } finally { + if (reader != null) { + reader.close(); + } + } + } + String tempFilename = wap.p.getName(); + long minSeqId = Long.parseLong(tempFilename.substring(0, + tempFilename.indexOf(RECOVERED_LOG_TMPFILE_SUFFIX))); + // When current minSeqId less than minSeqId in same named file, it means + // we have more entries, so delete the same name file + // Otherwise we should not flush current buffer. + if (minSeqId < minSeqIdInSameFile) { + LOG.warn("Found existing old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + dst + ", length=" + fs.getFileStatus(dst).getLen()); - if (!fs.delete(dst, false)) { - LOG.warn("Failed deleting of old " + dst); - throw new IOException("Failed deleting of old " + dst); + if (!fs.delete(dst, false)) { + LOG.warn("Failed deleting of old " + dst); + throw new IOException("Failed deleting of old " + dst); + } + } else { + LOG.info("Current entries in buffer have been flushed into edits file!"); + shouldFlush = false; } } // Skip the unit tests which create a splitter that reads and // writes the data without touching disk. // TestHLogSplit#testThreading is an example. - if (fs.exists(wap.p)) { + if (shouldFlush && fs.exists(wap.p)) { if (!fs.rename(wap.p, dst)) { throw new IOException("Failed renaming " + wap.p + " to " + dst); } LOG.info("Rename " + wap.p + " to " + dst); + paths.add(dst); } } catch (IOException ioe) { LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); thrown.add(ioe); return null; } - paths.add(dst); return null; } }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 747977a..4c1ee20 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -209,6 +209,108 @@ public class TestWALFactory { verifySplits(splits, howmany); } + @Test + public void testDuplicateWALSplit() throws IOException { + final TableName tableName = TableName.valueOf(currentTest.getMethodName()); + final byte [] rowName = tableName.getName(); + final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); + final Path logdir = new Path(hbaseDir, + DefaultWALProvider.getWALDirectoryName(currentTest.getMethodName())); + Path oldLogDir = new Path(hbaseDir, HConstants.HREGION_OLDLOGDIR_NAME); + + Path tabledir = FSUtils.getTableDir(hbaseDir, tableName); + fs.mkdirs(tabledir); + HRegionInfo infos = new HRegionInfo(tableName, + Bytes.toBytes("" + 0), Bytes.toBytes("" + (100)), false); + fs.mkdirs(new Path(tabledir, infos.getEncodedName())); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor("column")); + int editsNum = 10; + final WAL log = + wals.getWAL(infos.getEncodedNameAsBytes(), infos.getTable().getNamespace()); + for (int j = 0; j < editsNum; j++) { + WALEdit edit = new WALEdit(); + byte [] family = Bytes.toBytes("column"); + byte [] qualifier = Bytes.toBytes(Integer.toString(j)); + byte [] column = Bytes.toBytes("column:" + Integer.toString(j)); + edit.add(new KeyValue(rowName, family, qualifier, + System.currentTimeMillis(), column)); + WALKey walKey = new WALKey(infos.getEncodedNameAsBytes(), tableName, + System.currentTimeMillis(), mvcc); + log.append(htd, infos, walKey, edit, true); + walKey.getWriteEntry(); + } + log.sync(); + + Path walPath = DefaultWALProvider.getCurrentFileName(log); + int count = mockDuplicateWAL(walPath, logdir); + assertEquals(count, editsNum - 1); + wals.shutdown(); + + FileStatus[] files = FSUtils.listStatus(fs, logdir); + assertEquals(2, files.length); + + List splits = WALSplitter.split(hbaseDir, logdir, oldLogDir, fs, conf, wals); + assertEquals(1, splits.size()); + for (int i = 0; i < splits.size(); i++) { + LOG.info("Verifying=" + splits.get(i)); + WAL.Reader reader = wals.createReader(fs, splits.get(i)); + try { + count = 0; + String previousRegion = null; + long seqno = -1; + WAL.Entry entry = new WAL.Entry(); + while((entry = reader.next(entry)) != null) { + WALKey key = entry.getKey(); + String region = Bytes.toString(key.getEncodedRegionName()); + // Assert that all edits are for same region. + if (previousRegion != null) { + assertEquals(previousRegion, region); + } + LOG.info("oldseqno=" + seqno + ", newseqno=" + key.getLogSeqNum()); + assertTrue(seqno < key.getLogSeqNum()); + seqno = key.getLogSeqNum(); + previousRegion = region; + count++; + } + assertEquals(editsNum, count); + } finally { + reader.close(); + } + } + } + + /** + * Use wal to mock one WAL, if wal is [1, 2, 3, 4, 5] + * the mock wal will be [2, 3, 4, 5] + * we skip the first entry. + * */ + private int mockDuplicateWAL(Path wal, Path logdir) throws IOException { + WAL.Reader reader = null; + DefaultWALProvider.Writer writer = + DefaultWALProvider.createWriter(conf, fs, new Path(logdir, + currentTest.getMethodName() + "." + System.currentTimeMillis()), false); + WAL.Entry entry = null; + int count = 0; + try { + reader = wals.createReader(fs, wal); + // skip the first entry + reader.next(); + while ((entry = reader.next()) != null) { + writer.append(entry); + count++; + } + writer.sync(); + } finally { + if (reader != null) { + reader.close(); + } + writer.close(); + } + return count; + } + + /** * Test new HDFS-265 sync. * @throws Exception -- 1.9.3 (Apple Git-50)