diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 1cec378..14ab2cb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -3081,7 +3081,8 @@ public class HRegion implements HeapSize { // , Writable{ } try { - seqid = replayRecoveredEdits(edits, maxSeqIdInStores, reporter); + // replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater + seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter)); } catch (IOException e) { boolean skipErrors = conf.getBoolean( HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS, @@ -3208,6 +3209,7 @@ public class HRegion implements HeapSize { // , Writable{ if (firstSeqIdInLog == -1) { firstSeqIdInLog = key.getLogSeqNum(); } + currentEditSeqId = key.getLogSeqNum(); boolean flush = false; for (KeyValue kv: val.getKeyValues()) { // Check this edit is for me. Also, guard against writing the special @@ -3242,7 +3244,6 @@ public class HRegion implements HeapSize { // , Writable{ skippedEdits++; continue; } - currentEditSeqId = key.getLogSeqNum(); // Once we are over the limit, restoreEdit will keep returning true to // flush -- but don't flush until we've played all the kvs that make up // the WALEdit. diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 66c08ed..6cd7ddf 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -140,10 +140,11 @@ import org.junit.rules.TestName; import org.mockito.Mockito; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; /** * Basic stand-alone testing of HRegion. No clusters! - * + * * A lot of the meta information for an HRegion now lives inside other HRegions * or in the HBaseMaster, so only basic testing is possible. */ @@ -209,7 +210,7 @@ public class TestHRegion { * case previous flush fails and leaves some data in snapshot. The bug could cause loss of data * in current memstore. The fix is removing all conditions except abort check so we ensure 2 * flushes for region close." - * @throws IOException + * @throws IOException */ @Test (timeout=60000) public void testCloseCarryingSnapshot() throws IOException { @@ -258,6 +259,7 @@ public class TestHRegion { // Inject our faulty LocalFileSystem conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); user.runAs(new PrivilegedExceptionAction() { + @Override public Object run() throws Exception { // Make sure it worked (above is sensitive to caching details in hadoop core) FileSystem fs = FileSystem.get(conf); @@ -318,6 +320,7 @@ public class TestHRegion { // Inject our faulty LocalFileSystem conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class); user.runAs(new PrivilegedExceptionAction() { + @Override public Object run() throws Exception { // Make sure it worked (above is sensitive to caching details in hadoop core) FileSystem fs = FileSystem.get(conf); @@ -575,6 +578,67 @@ public class TestHRegion { } @Test + public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception { + String method = "testSkipRecoveredEditsReplayTheLastFileIgnored"; + TableName tableName = TableName.valueOf(method); + byte[] family = Bytes.toBytes("family"); + this.region = initHRegion(tableName, method, CONF, family); + try { + Path regiondir = region.getRegionFileSystem().getRegionDir(); + FileSystem fs = region.getRegionFileSystem().getFileSystem(); + byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); + + assertEquals(0, region.getStoreFileList( + region.getStores().keySet().toArray(new byte[0][])).size()); + + Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir); + + long maxSeqId = 1050; + long minSeqId = 1000; + + for (long i = minSeqId; i <= maxSeqId; i += 10) { + Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); + fs.create(recoveredEdits); + HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF); + + long time = System.nanoTime(); + WALEdit edit = null; + if (i == maxSeqId) { + edit = WALEdit.createCompaction(CompactionDescriptor.newBuilder() + .setTableName(ByteString.copyFrom(tableName.getName())) + .setFamilyName(ByteString.copyFrom(regionName)) + .setEncodedRegionName(ByteString.copyFrom(regionName)) + .setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString()))) + .build()); + } else { + edit = new WALEdit(); + edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes + .toBytes(i))); + } + writer.append(new HLog.Entry(new HLogKey(regionName, tableName, i, time, + HConstants.DEFAULT_CLUSTER_ID), edit)); + writer.close(); + } + + long recoverSeqId = 1030; + Map maxSeqIdInStores = new TreeMap(Bytes.BYTES_COMPARATOR); + MonitoredTask status = TaskMonitor.get().createStatus(method); + for (Store store : region.getStores().values()) { + maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1); + } + long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status); + assertEquals(maxSeqId, seqId); + + // assert that the files are flushed + assertEquals(1, region.getStoreFileList( + region.getStores().keySet().toArray(new byte[0][])).size()); + + } finally { + HRegion.closeHRegion(this.region); + this.region = null; + } } + + @Test public void testRecoveredEditsReplayCompaction() throws Exception { String method = name.getMethodName(); TableName tableName = TableName.valueOf(method);