diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e0d8b00..10dd204 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3064,7 +3064,8 @@ public class HRegion implements HeapSize { // , Writable{ } try { - seqid = replayRecoveredEdits(edits, maxSeqIdInStores, reporter); + // replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater + seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter)); } catch (IOException e) { boolean skipErrors = conf.getBoolean( HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, @@ -3191,6 +3192,7 @@ public class HRegion implements HeapSize { // , Writable{ if (firstSeqIdInLog == -1) { firstSeqIdInLog = key.getLogSeqNum(); } + currentEditSeqId = key.getLogSeqNum(); boolean flush = false; for (KeyValue kv: val.getKeyValues()) { // Check this edit is for me. Also, guard against writing the special @@ -3225,7 +3227,6 @@ public class HRegion implements HeapSize { // , Writable{ skippedEdits++; continue; } - currentEditSeqId = key.getLogSeqNum(); // Once we are over the limit, restoreEdit will keep returning true to // flush -- but don't flush until we've played all the kvs that make up // the WALEdit. diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index c4238b7..d89813c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -133,6 +133,7 @@ import org.junit.rules.TestName; import org.mockito.Mockito; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; /** * Basic stand-alone testing of HRegion. @@ -414,6 +415,67 @@ public class TestHRegion { } @Test + public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception { + String method = "testSkipRecoveredEditsReplayTheLastFileIgnored"; + TableName tableName = TableName.valueOf(method); + byte[] family = Bytes.toBytes("family"); + this.region = initHRegion(tableName, method, conf, family); + try { + Path regiondir = region.getRegionFileSystem().getRegionDir(); + FileSystem fs = region.getRegionFileSystem().getFileSystem(); + byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); + + assertEquals(0, region.getStoreFileList( + region.getStores().keySet().toArray(new byte[0][])).size()); + + Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir); + + long maxSeqId = 1050; + long minSeqId = 1000; + + for (long i = minSeqId; i <= maxSeqId; i += 10) { + Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); + fs.create(recoveredEdits); + HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, conf); + + long time = System.nanoTime(); + WALEdit edit = null; + if (i == maxSeqId) { + edit = WALEdit.createCompaction(CompactionDescriptor.newBuilder() + .setTableName(ByteString.copyFrom(tableName.getName())) + .setFamilyName(ByteString.copyFrom(regionName)) + .setEncodedRegionName(ByteString.copyFrom(regionName)) + .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString()))) + .build()); + } else { + edit = new WALEdit(); + edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes + .toBytes(i))); + } + writer.append(new HLog.Entry(new HLogKey(regionName, tableName, i, time, + HConstants.DEFAULT_CLUSTER_ID), edit)); + writer.close(); + } + + long recoverSeqId = 1030; + Map maxSeqIdInStores = new TreeMap(Bytes.BYTES_COMPARATOR); + MonitoredTask status = TaskMonitor.get().createStatus(method); + for (Store store : region.getStores().values()) { + maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1); + } + long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + assertEquals(maxSeqId, seqId); + + // assert that the files are flushed + assertEquals(1, region.getStoreFileList( + region.getStores().keySet().toArray(new byte[0][])).size()); + + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } } + + @Test public void testRecoveredEditsReplayCompaction() throws Exception { String method = name.getMethodName(); TableName tableName = TableName.valueOf(method);