Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (revision 1552053) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java (working copy) @@ -416,168 +416,6 @@ } /** - * Test that HLog is rolled when all data nodes in the pipeline have been - * restarted. - * @throws Exception - */ - @Test - public void testLogRollOnPipelineRestart() throws Exception { - LOG.info("Starting testLogRollOnPipelineRestart"); - assertTrue("This test requires HLog file replication.", - fs.getDefaultReplication() > 1); - LOG.info("Replication=" + fs.getDefaultReplication()); - // When the hbase:meta table can be opened, the region servers are running - new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME); - - this.server = cluster.getRegionServer(0); - this.log = server.getWAL(); - - // Create the test table and open it - String tableName = getName(); - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); - desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); - - admin.createTable(desc); - HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); - - server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName)); - this.log = server.getWAL(); - final List paths = new ArrayList(); - final List preLogRolledCalled = new ArrayList(); - paths.add(((FSHLog) log).computeFilename()); - log.registerWALActionsListener(new WALActionsListener() { - @Override - public void preLogRoll(Path oldFile, Path newFile) { - LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile); - preLogRolledCalled.add(new Integer(1)); - } - @Override - public void postLogRoll(Path oldFile, Path newFile) { - paths.add(newFile); - } - @Override - public void preLogArchive(Path oldFile, Path newFile) {} - @Override - public void postLogArchive(Path oldFile, Path newFile) {} - @Override - public void logRollRequested() {} - @Override - public void logCloseRequested() {} - @Override - public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, - WALEdit logEdit) {} - @Override - public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, - WALEdit logEdit) {} - }); - - assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas()); - // don't run this test without append support (HDFS-200 & HDFS-142) - assertTrue("Need append support for this test", FSUtils - .isAppendSupported(TEST_UTIL.getConfiguration())); - - writeData(table, 1002); - - table.setAutoFlush(true, true); - - long curTime = System.currentTimeMillis(); - long oldFilenum = log.getFilenum(); - assertTrue("Log should have a timestamp older than now", - curTime > oldFilenum && oldFilenum != -1); - - assertTrue("The log shouldn't have rolled yet", oldFilenum == log.getFilenum()); - - // roll all datanodes in the pipeline - dfsCluster.restartDataNodes(); - Thread.sleep(1000); - dfsCluster.waitActive(); - LOG.info("Data Nodes restarted"); - validateData(table, 1002); - - // this write should succeed, but trigger a log roll - writeData(table, 1003); - long newFilenum = log.getFilenum(); - - assertTrue("Missing datanode should've triggered a log roll", - newFilenum > oldFilenum && newFilenum > curTime); - validateData(table, 1003); - - writeData(table, 1004); - - // roll all datanode again - dfsCluster.restartDataNodes(); - Thread.sleep(1000); - dfsCluster.waitActive(); - LOG.info("Data Nodes restarted"); - validateData(table, 1004); - - // this write should succeed, but trigger a log roll - writeData(table, 1005); - - // force a log roll to read back and verify previously written logs - log.rollWriter(true); - assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(), - preLogRolledCalled.size() >= 1); - - // read back the data written - Set loggedRows = new HashSet(); - FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration()); - for (Path p : paths) { - LOG.debug("recovering lease for " + p); - fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), null); - - LOG.debug("Reading HLog "+FSUtils.getPath(p)); - HLog.Reader reader = null; - try { - reader = HLogFactory.createReader(fs, p, - TEST_UTIL.getConfiguration()); - HLog.Entry entry; - while ((entry = reader.next()) != null) { - LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getKeyValues()); - for (KeyValue kv : entry.getEdit().getKeyValues()) { - loggedRows.add(Bytes.toStringBinary(kv.getRow())); - } - } - } catch (EOFException e) { - LOG.debug("EOF reading file "+FSUtils.getPath(p)); - } finally { - if (reader != null) reader.close(); - } - } - - // verify the written rows are there - assertTrue(loggedRows.contains("row1002")); - assertTrue(loggedRows.contains("row1003")); - assertTrue(loggedRows.contains("row1004")); - assertTrue(loggedRows.contains("row1005")); - - // flush all regions - List regions = - new ArrayList(server.getOnlineRegionsLocalContext()); - for (HRegion r: regions) { - r.flushcache(); - } - - ResultScanner scanner = table.getScanner(new Scan()); - try { - for (int i=2; i<=5; i++) { - Result r = scanner.next(); - assertNotNull(r); - assertFalse(r.isEmpty()); - assertEquals("row100"+i, Bytes.toString(r.getRow())); - } - } finally { - scanner.close(); - } - - // verify that no region servers aborted - for (JVMClusterUtil.RegionServerThread rsThread: - TEST_UTIL.getHBaseCluster().getRegionServerThreads()) { - assertFalse(rsThread.getRegionServer().isAborted()); - } - } - - /** * Tests that logs are deleted when some region has a compaction * record in WAL and no other records. See HBASE-8597. */ Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling2.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling2.java (revision 0) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling2.java (revision 0) @@ -0,0 +1,383 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.LargeTests; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.LeaseManager; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test log deletion as logs are rolled. + */ +@Category(LargeTests.class) +public class TestLogRolling2 { + private static final Log LOG = LogFactory.getLog(TestLogRolling2.class); + private HRegionServer server; + private HLog log; + private String tableName; + private byte[] value; + private FileSystem fs; + private MiniDFSCluster dfsCluster; + private HBaseAdmin admin; + private MiniHBaseCluster cluster; + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + // verbose logging on classes that are touched in these tests + { + ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.server.namenode.FSNamesystem")) + .getLogger().setLevel(Level.ALL); + ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)HRegionServer.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)HRegion.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)HLog.LOG).getLogger().setLevel(Level.ALL); + } + + /** + * constructor + * @throws Exception + */ + public TestLogRolling2() { + this.server = null; + this.log = null; + this.tableName = null; + + String className = this.getClass().getName(); + StringBuilder v = new StringBuilder(className); + while (v.length() < 1000) { + v.append(className); + } + this.value = Bytes.toBytes(v.toString()); + } + + // Need to override this setup so we can edit the config before it gets sent + // to the HDFS & HBase cluster startup. + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // TODO: testLogRollOnDatanodeDeath fails if short circuit reads are on under the hadoop2 + // profile. See HBASE-9337 for related issues. + System.setProperty("hbase.tests.use.shortcircuit.reads", "false"); + + /**** configuration for testLogRolling ****/ + // Force a region split after every 768KB + TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, 768L * 1024L); + + // We roll the log after every 32 writes + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32); + + TEST_UTIL.getConfiguration().setInt( + "hbase.regionserver.logroll.errors.tolerated", 2); + TEST_UTIL.getConfiguration().setInt("ipc.ping.interval", 10 * 1000); + TEST_UTIL.getConfiguration().setInt("ipc.socket.timeout", 10 * 1000); + TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000); + + // For less frequently updated regions flush after every 2 flushes + TEST_UTIL.getConfiguration().setInt("hbase.hregion.memstore.optionalflushcount", 2); + + // We flush the cache after every 8192 bytes + TEST_UTIL.getConfiguration().setInt( + HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 8192); + + // Increase the amount of time between client retries + TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000); + + // Reduce thread wake frequency so that other threads can get + // a chance to run. + TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, 2 * 1000); + + /**** configuration for testLogRollOnDatanodeDeath ****/ + // make sure log.hflush() calls syncFs() to open a pipeline + TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true); + // lower the namenode & datanode heartbeat so the namenode + // quickly detects datanode failures + TEST_UTIL.getConfiguration().setInt("heartbeat.recheck.interval", 5000); + TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); + // the namenode might still try to choose the recently-dead datanode + // for a pipeline, so try to a new pipeline multiple times + TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); + TEST_UTIL.getConfiguration().setInt( + "hbase.regionserver.hlog.tolerable.lowreplication", 2); + TEST_UTIL.getConfiguration().setInt( + "hbase.regionserver.hlog.lowreplication.rolllimit", 3); + } + + @Before + public void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1, 1, 2); + + cluster = TEST_UTIL.getHBaseCluster(); + dfsCluster = TEST_UTIL.getDFSCluster(); + fs = TEST_UTIL.getTestFileSystem(); + admin = TEST_UTIL.getHBaseAdmin(); + + // disable region rebalancing (interferes with log watching) + cluster.getMaster().balanceSwitch(false); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private static String getName() { + return "TestLogRolling2"; + } + + void writeData(HTable table, int rownum) throws IOException { + doPut(table, rownum); + + // sleep to let the log roller run (if it needs to) + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + // continue + } + } + + void validateData(HTable table, int rownum) throws IOException { + String row = "row" + String.format("%1$04d", rownum); + Get get = new Get(Bytes.toBytes(row)); + get.addFamily(HConstants.CATALOG_FAMILY); + Result result = table.get(get); + assertTrue(result.size() == 1); + assertTrue(Bytes.equals(value, + result.getValue(HConstants.CATALOG_FAMILY, null))); + LOG.info("Validated row " + row); + } + + /** + * Test that HLog is rolled when all data nodes in the pipeline have been + * restarted. + * @throws Exception + */ + @Test + public void testLogRollOnPipelineRestart() throws Exception { + LOG.info("Starting testLogRollOnPipelineRestart"); + assertTrue("This test requires HLog file replication.", + fs.getDefaultReplication() > 1); + LOG.info("Replication=" + fs.getDefaultReplication()); + // When the hbase:meta table can be opened, the region servers are running + new HTable(TEST_UTIL.getConfiguration(), TableName.META_TABLE_NAME); + + this.server = cluster.getRegionServer(0); + this.log = server.getWAL(); + + // Create the test table and open it + String tableName = getName(); + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); + + admin.createTable(desc); + HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); + + server = TEST_UTIL.getRSForFirstRegionInTable(Bytes.toBytes(tableName)); + this.log = server.getWAL(); + final List paths = new ArrayList(); + final List preLogRolledCalled = new ArrayList(); + paths.add(((FSHLog) log).computeFilename()); + log.registerWALActionsListener(new WALActionsListener() { + @Override + public void preLogRoll(Path oldFile, Path newFile) { + LOG.debug("preLogRoll: oldFile="+oldFile+" newFile="+newFile); + preLogRolledCalled.add(new Integer(1)); + } + @Override + public void postLogRoll(Path oldFile, Path newFile) { + paths.add(newFile); + } + @Override + public void preLogArchive(Path oldFile, Path newFile) {} + @Override + public void postLogArchive(Path oldFile, Path newFile) {} + @Override + public void logRollRequested() {} + @Override + public void logCloseRequested() {} + @Override + public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, + WALEdit logEdit) {} + @Override + public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, + WALEdit logEdit) {} + }); + + assertTrue("Need HDFS-826 for this test", ((FSHLog) log).canGetCurReplicas()); + // don't run this test without append support (HDFS-200 & HDFS-142) + assertTrue("Need append support for this test", FSUtils + .isAppendSupported(TEST_UTIL.getConfiguration())); + + writeData(table, 1002); + + table.setAutoFlush(true, true); + + long curTime = System.currentTimeMillis(); + long oldFilenum = log.getFilenum(); + assertTrue("Log should have a timestamp older than now", + curTime > oldFilenum && oldFilenum != -1); + + assertTrue("The log shouldn't have rolled yet", oldFilenum == log.getFilenum()); + + // roll all datanodes in the pipeline + dfsCluster.restartDataNodes(); + Thread.sleep(1000); + dfsCluster.waitActive(); + LOG.info("Data Nodes restarted"); + validateData(table, 1002); + + // this write should succeed, but trigger a log roll + writeData(table, 1003); + long newFilenum = log.getFilenum(); + + assertTrue("Missing datanode should've triggered a log roll", + newFilenum > oldFilenum && newFilenum > curTime); + validateData(table, 1003); + + writeData(table, 1004); + + // roll all datanode again + dfsCluster.restartDataNodes(); + Thread.sleep(1000); + dfsCluster.waitActive(); + LOG.info("Data Nodes restarted"); + validateData(table, 1004); + + // this write should succeed, but trigger a log roll + writeData(table, 1005); + + // force a log roll to read back and verify previously written logs + log.rollWriter(true); + assertTrue("preLogRolledCalled has size of " + preLogRolledCalled.size(), + preLogRolledCalled.size() >= 1); + + // read back the data written + Set loggedRows = new HashSet(); + FSUtils fsUtils = FSUtils.getInstance(fs, TEST_UTIL.getConfiguration()); + for (Path p : paths) { + LOG.debug("recovering lease for " + p); + fsUtils.recoverFileLease(((HFileSystem)fs).getBackingFs(), p, TEST_UTIL.getConfiguration(), null); + + LOG.debug("Reading HLog "+FSUtils.getPath(p)); + HLog.Reader reader = null; + try { + reader = HLogFactory.createReader(fs, p, + TEST_UTIL.getConfiguration()); + HLog.Entry entry; + while ((entry = reader.next()) != null) { + LOG.debug("#"+entry.getKey().getLogSeqNum()+": "+entry.getEdit().getKeyValues()); + for (KeyValue kv : entry.getEdit().getKeyValues()) { + loggedRows.add(Bytes.toStringBinary(kv.getRow())); + } + } + } catch (EOFException e) { + LOG.debug("EOF reading file "+FSUtils.getPath(p)); + } finally { + if (reader != null) reader.close(); + } + } + + // verify the written rows are there + assertTrue(loggedRows.contains("row1002")); + assertTrue(loggedRows.contains("row1003")); + assertTrue(loggedRows.contains("row1004")); + assertTrue(loggedRows.contains("row1005")); + + // flush all regions + List regions = + new ArrayList(server.getOnlineRegionsLocalContext()); + for (HRegion r: regions) { + r.flushcache(); + } + + ResultScanner scanner = table.getScanner(new Scan()); + try { + for (int i=2; i<=5; i++) { + Result r = scanner.next(); + assertNotNull(r); + assertFalse(r.isEmpty()); + assertEquals("row100"+i, Bytes.toString(r.getRow())); + } + } finally { + scanner.close(); + } + + // verify that no region servers aborted + for (JVMClusterUtil.RegionServerThread rsThread: + TEST_UTIL.getHBaseCluster().getRegionServerThreads()) { + assertFalse(rsThread.getRegionServer().isAborted()); + } + } + + private void doPut(HTable table, int i) throws IOException { + Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); + put.add(HConstants.CATALOG_FAMILY, null, value); + table.put(put); + } +} +