From cd5c76f937cdfc894f1bb377855428961dba8be9 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Tue, 1 May 2018 23:46:41 -0700 Subject: [PATCH] HBASE-20503 [AsyncFSWAL] Failed to get sync result after 300000 ms for txid=160912, WAL system stuck? --- .../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java | 6 + .../hbase/regionserver/wal/AbstractFSWAL.java | 3 +- .../wal/TestAsyncFSWALBrokenStream.java | 166 +++++++++++++++++++++ 3 files changed, 174 insertions(+), 1 deletion(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALBrokenStream.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 1645d68be6..157b17ac0c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -552,6 +552,12 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { datanodeList.forEach(ch -> ch.close()); datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); endFileLease(client, fileId); + // "If we fail to write to DN, the standard way to recover, is to bump the GS at NN side, and + // reconnect to DN, finish the block, and then complete the file normally. We can not call + // complete file directly, it does not work... Here we use recoverLease to get the same result, + // and NN will help finishing the block, since for wal we will not write to the stream any more. + // recoverAndClose will be called in a thread pool in background (verify!)? It should not block + // the write request." Duo over in HBASE-20507. fsUtils.recoverFileLease(dfs, new Path(src), conf, reporter == null ? new CancelOnClose(client) : reporter); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index ce8dafa4a5..85f85526d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -607,7 +607,8 @@ public abstract class AbstractFSWAL implements WAL { /** * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. */ - private void cleanOldLogs() throws IOException { + @VisibleForTesting + void cleanOldLogs() throws IOException { List> logsToArchive = null; // For each log file, look at its Map of regions to highest sequence id; if all sequence ids // are older than what is currently in memory, the WAL can be GC'd. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALBrokenStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALBrokenStream.java new file mode 100644 index 0000000000..5c7399d676 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALBrokenStream.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + + +@Category({ MiscTests.class, MediumTests.class }) +public class TestAsyncFSWALBrokenStream { + private static final Logger LOG = LoggerFactory.getLogger(TestAsyncFSWALBrokenStream.class); + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncFSWALBrokenStream.class); + @Rule + public TestName name = new TestName(); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + @Rule + public final TestName currentTest = new TestName(); + + private static final byte [] COLUMN_FAMILY_NAME_BYTES = Bytes.toBytes("cf"); + + @Before + public void before() throws Exception { + // Make block sizes small. + TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); + // Quicker heartbeat interval for faster DN death notification + TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); + TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); + TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); + + // Faster failover with cluster.shutdown();fs.close() idiom + TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1); + TEST_UTIL.getConfiguration().setInt( "dfs.client.block.recovery.retries", 1); + TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500); + // Have three DNs and one RS. + TEST_UTIL.startMiniDFSCluster(3); + TEST_UTIL.startMiniCluster(); + } + + @After + public void after() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Tests WAL rolling, 'broken stream', and that WAL Archiving doesn't mess us up. + * See HBASE-20503. This test is like TestFanOutOneBlockAsyncDFSOutput#testRecover but at a + * higher level. + */ + @Test + public void testWALBrokenStream() throws IOException { + TableDescriptor tableDescriptor = + TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY_NAME_BYTES)).build(); + try (Admin admin = TEST_UTIL.getConnection().getAdmin()) { + admin.createTable(tableDescriptor); + } + int wrote = 0; + try (Table table = TEST_UTIL.getConnection().getTable(tableDescriptor.getTableName())) { + addEdits(table, wrote, 1); + wrote += 1; + // Restart one datanode which causes one connection broken + Thread t = new Thread() { + @Override + public void run() { + HRegionServer hrs = TEST_UTIL.getHBaseCluster().getRegionServer(0); + LOG.info("RegionServer={}", hrs.getServerName()); + for (int i = 0; i < 100; i++) { + try { + for(WAL wal: hrs.getWALs()) { + LOG.info("Clean old logs cycle={}", i); + ((AbstractFSWAL)wal).cleanOldLogs(); + } + } catch (IOException e) { + e.printStackTrace(); + } + Threads.sleep(100); + } + } + }; + t.start(); + TEST_UTIL.getDFSCluster().restartDataNode(0); + addEdits(table, wrote, 1); + wrote += 1; + int read = 0; + try (ResultScanner scanner = table.getScanner(new Scan())) { + for(Result result: scanner) { + LOG.info("RESULT={}", result); + read++; + } + } + Assert.assertEquals(wrote, read); + addEdits(table, wrote, 1); + wrote += 1; + addEdits(table, wrote, 1); + wrote += 1; + addEdits(table, wrote, 1); + wrote += 1; + read = 0; + try (ResultScanner scanner = table.getScanner(new Scan())) { + for(Result result: scanner) { + LOG.info("RESULT={}", result); + read++; + } + } + Assert.assertEquals(wrote, read); + } + } + + static void addEdits(Table table, int index, int times) + throws IOException { + final byte[] row = Bytes.toBytes("row" + index); + for (int i = 0; i < times; i++) { + Put put = new Put(row); + put.addColumn(COLUMN_FAMILY_NAME_BYTES, row, row); + table.put(put); + } + } +} -- 2.16.3