Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (revision 1307712) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (working copy) @@ -54,9 +54,11 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; @@ -67,6 +69,7 @@ import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; @@ -136,6 +139,95 @@ SchemaMetrics.validateMetricChanges(startingMetrics); } + public void testDataCorrectnessReplayingRecoveredEdits() throws Exception { + final int NUM_MASTERS = 1; + final int NUM_RS = 3; + TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS); + + try { + final byte[] TABLENAME = Bytes + .toBytes("testDataCorrectnessReplayingRecoveredEdits"); + final byte[] FAMILY = Bytes.toBytes("family"); + MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + HMaster master = cluster.getMaster(); + + // Create table + HTableDescriptor desc = new HTableDescriptor(TABLENAME); + desc.addFamily(new HColumnDescriptor(FAMILY)); + HBaseAdmin hbaseAdmin = TEST_UTIL.getHBaseAdmin(); + hbaseAdmin.createTable(desc); + + assertTrue(hbaseAdmin.isTableAvailable(TABLENAME)); + + // Put data: r1->v1 + HTable table = new HTable(TEST_UTIL.getConfiguration(), TABLENAME); + putDataAndVerify(table, "r1", FAMILY, "v1", 1); + + // Move region to target server + HRegionInfo regionInfo = table.getRegionLocation("r1").getRegionInfo(); + int originServerNum = cluster.getServerWith(regionInfo.getRegionName()); + HRegionServer originServer = cluster.getRegionServer(originServerNum); + int targetServerNum = NUM_RS - 1 - originServerNum; + HRegionServer targetServer = cluster.getRegionServer(targetServerNum); + hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(), + Bytes.toBytes(targetServer.getServerName().getServerName())); + do { + Thread.sleep(1); + } while (cluster.getServerWith(regionInfo.getRegionName()) == originServerNum); + + // Put data: r2->v2 + putDataAndVerify(table, "r2", FAMILY, "v2", 2); + + // Move region to origin server + hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(), + Bytes.toBytes(originServer.getServerName().getServerName())); + do { + Thread.sleep(1); + } while (cluster.getServerWith(regionInfo.getRegionName()) == targetServerNum); + + // Put data: r3->v3 + putDataAndVerify(table, "r3", FAMILY, "v3", 3); + + // Kill target server + targetServer.kill(); + cluster.getRegionServerThreads().get(targetServerNum).join(); + // Wait until finish processing of shutdown + while (master.getServerManager().areDeadServersInProgress()) { + Thread.sleep(5); + } + // Kill origin server + originServer.kill(); + cluster.getRegionServerThreads().get(originServerNum).join(); + + // Put data: r4->v4 + putDataAndVerify(table, "r4", FAMILY, "v4", 4); + + } finally { + TEST_UTIL.shutdownMiniCluster(); + } + } + + private void putDataAndVerify(HTable table, String row, byte[] family, + String value, int verifyNum) throws IOException { + System.out.println("=========Putting data :" + row); + Put put = new Put(Bytes.toBytes(row)); + put.add(family, Bytes.toBytes("q1"), Bytes.toBytes(value)); + table.put(put); + ResultScanner resultScanner = table.getScanner(new Scan()); + List results = new ArrayList(); + while (true) { + Result r = resultScanner.next(); + if (r == null) + break; + results.add(r); + } + resultScanner.close(); + if (results.size() != verifyNum) { + System.out.println(results); + } + assertEquals(verifyNum, results.size()); + } + ////////////////////////////////////////////////////////////////////////////// // New tests that doesn't spin up a mini cluster but rather just test the // individual code pieces in the HRegion. Putting files locally in Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (revision 1307712) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java (working copy) @@ -426,6 +426,7 @@ } } wap.w.append(entry); + outputSink.updateRegionMaximumEditLogSeqNum(entry); editsCount++; // If sufficient edits have passed OR we've opened a few files, check if // we should report progress. @@ -455,7 +456,8 @@ throw e; } finally { int n = 0; - for (Object o : logWriters.values()) { + for (Map.Entry logWritersEntry : logWriters.entrySet()) { + Object o = logWritersEntry.getValue(); long t1 = EnvironmentEdgeManager.currentTimeMillis(); if ((t1 - last_report_at) > period) { last_report_at = t; @@ -471,7 +473,8 @@ WriterAndPath wap = (WriterAndPath)o; wap.w.close(); LOG.debug("Closed " + wap.p); - Path dst = getCompletedRecoveredEditsFilePath(wap.p); + Path dst = getCompletedRecoveredEditsFilePath(wap.p, outputSink + .getRegionMaximumEditLogSeqNum(logWritersEntry.getKey())); if (!dst.equals(wap.p) && fs.exists(dst)) { LOG.warn("Found existing old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + dst @@ -488,6 +491,7 @@ if (!fs.rename(wap.p, dst)) { throw new IOException("Failed renaming " + wap.p + " to " + dst); } + LOG.debug("Rename " + wap.p + " to " + dst); } } String msg = "Processed " + editsCount + " edits across " + n + " regions" + @@ -681,16 +685,15 @@ } /** - * Convert path to a file under RECOVERED_EDITS_DIR directory without - * RECOVERED_LOG_TMPFILE_SUFFIX + * Get the completed recovered edits file path whose name is region's maximum + * edit log seq num. * @param srcPath - * @return dstPath without RECOVERED_LOG_TMPFILE_SUFFIX + * @param maximumEditLogSeqNum + * @return dstPath take region's maximum edit log seq num as the name */ - static Path getCompletedRecoveredEditsFilePath(Path srcPath) { - String fileName = srcPath.getName(); - if (fileName.endsWith(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)) { - fileName = fileName.split(HLog.RECOVERED_LOG_TMPFILE_SUFFIX)[0]; - } + static Path getCompletedRecoveredEditsFilePath(Path srcPath, + Long maximumEditLogSeqNum) { + String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum); return new Path(srcPath.getParent(), fileName); } @@ -1027,6 +1030,7 @@ } } + private void writeBuffer(RegionEntryBuffer buffer) throws IOException { List entries = buffer.entryBuffer; if (entries.isEmpty()) { @@ -1050,6 +1054,7 @@ } } wap.w.append(logEntry); + outputSink.updateRegionMaximumEditLogSeqNum(logEntry); editsCount++; } // Pass along summary statistics @@ -1138,6 +1143,8 @@ class OutputSink { private final Map logWriters = Collections.synchronizedMap( new TreeMap(Bytes.BYTES_COMPARATOR)); + private final Map regionMaximumEditLogSeqNum = Collections + .synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR)); private final List writerThreads = Lists.newArrayList(); /* Set of regions which we've decided should not output edits */ @@ -1204,8 +1211,11 @@ List paths = new ArrayList(); List thrown = Lists.newArrayList(); closeLogWriters(thrown); - for (WriterAndPath wap : logWriters.values()) { - Path dst = getCompletedRecoveredEditsFilePath(wap.p); + for (Map.Entry logWritersEntry : logWriters + .entrySet()) { + WriterAndPath wap = logWritersEntry.getValue(); + Path dst = getCompletedRecoveredEditsFilePath(wap.p, + regionMaximumEditLogSeqNum.get(logWritersEntry.getKey())); try { if (!dst.equals(wap.p) && fs.exists(dst)) { LOG.warn("Found existing old edits file. It could be the " @@ -1223,6 +1233,7 @@ if (!fs.rename(wap.p, dst)) { throw new IOException("Failed renaming " + wap.p + " to " + dst); } + LOG.debug("Rename " + wap.p + " to " + dst); } } catch (IOException ioe) { LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); @@ -1290,6 +1301,18 @@ } /** + * Update region's maximum edit log SeqNum. + */ + void updateRegionMaximumEditLogSeqNum(Entry entry) { + regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), + entry.getKey().getLogSeqNum()); + } + + Long getRegionMaximumEditLogSeqNum(byte[] region) { + return regionMaximumEditLogSeqNum.get(region); + } + + /** * @return a map from encoded region ID to the number of edits written out * for that region. */