From b9fd503e54cf444a3242e08ba04ce539ba457b6f Mon Sep 17 00:00:00 2001 From: Vincent Poon Date: Wed, 1 Nov 2017 15:39:56 -0700 Subject: [PATCH] HBASE-12125 Add Hbck option to check and fix WAL's from replication queue --- .../hbase/replication/ReplicationQueuesClient.java | 11 + .../replication/ReplicationQueuesClientZKImpl.java | 9 + .../hbase/replication/ReplicationQueuesZKImpl.java | 23 +- .../hbase/replication/ReplicationStateZKBase.java | 45 ++- .../TableBasedReplicationQueuesClientImpl.java | 6 + .../regionserver/DumpReplicationQueues.java | 3 +- .../regionserver/RecoveredReplicationSource.java | 73 +++-- .../regionserver/ReplicationSource.java | 1 - .../org/apache/hadoop/hbase/util/HBaseFsck.java | 76 ++++- .../hadoop/hbase/util/hbck/ReplicationChecker.java | 302 +++++++++++++++++- .../org/apache/hadoop/hbase/wal/WALSplitter.java | 36 ++- .../hbase/util/TestHBaseFsckReplication.java | 354 +++++++++++++++++++++ .../hadoop/hbase/util/hbck/HbckTestingUtil.java | 4 +- 13 files changed, 872 insertions(+), 71 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java index 2c513fa..4cd5561 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java @@ -55,6 +55,17 @@ public interface ReplicationQueuesClient { List getLogsInQueue(String serverName, String queueId) throws KeeperException; /** + * Get the current location in the log that we are replicating from + * @param serverName the server name of the region server that owns the queue + * @param queueId a String that identifies the queue + * @param filename log filename + * @return current position we are replicating from + * @throws ReplicationException thrown for problems accessing the queue + */ + long getLogPosition(String serverName, String queueId, String filename) + throws ReplicationException; + + /** * Get a list of all queues for the specified region server. * @param serverName the server name of the region server that owns the set of queues * @return a list of queueIds, null if this region server is not a replicator. diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java index 95b2e04..058c0f7 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -75,6 +75,15 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem } @Override + public long getLogPosition(String serverName, String queueId, String filename) + throws ReplicationException { + String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); + znode = ZKUtil.joinZNode(znode, queueId); + znode = ZKUtil.joinZNode(znode, filename); + return getLogPositionFromZNode(serverName, queueId, znode); + } + + @Override public List getAllQueues(String serverName) throws KeeperException { String znode = ZKUtil.joinZNode(this.queuesZNode, serverName); List result = null; diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 8e61df9..cc1fd28 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -30,9 +30,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -153,25 +150,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R public long getLogPosition(String queueId, String filename) throws ReplicationException { String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId); String znode = ZKUtil.joinZNode(clusterZnode, filename); - byte[] bytes = null; - try { - bytes = ZKUtil.getData(this.zookeeper, znode); - } catch (KeeperException e) { - throw new ReplicationException("Internal Error: could not get position in log for queueId=" - + queueId + ", filename=" + filename, e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return 0; - } - try { - return ZKUtil.parseWALPositionFrom(bytes); - } catch (DeserializationException de) { - LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename - + " znode content, continuing."); - } - // if we can not parse the position, start at the beginning of the wal file - // again - return 0; + return getLogPositionFromZNode(queueId, filename, znode); } @Override diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java index c6501e1..3ce1234 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java @@ -22,17 +22,19 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.List; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; - -import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -41,8 +43,8 @@ import org.apache.zookeeper.KeeperException; */ @InterfaceAudience.Private public abstract class ReplicationStateZKBase { - - /** + private static final Log LOG = LogFactory.getLog(ReplicationStateZKBase.class); + /** * The name of the znode that contains the replication status of a remote slave (i.e. peer) * cluster. */ @@ -152,4 +154,35 @@ public abstract class ReplicationStateZKBase { protected String getPeerNode(String id) { return ZKUtil.joinZNode(this.peersZNode, id); } + + /** + * Fetch the log position of the log at the given znode + * @param queueId only used for logging + * @param filename only used for logging + * @param znode Full znode of the log + * @return the log position + * @throws ReplicationException Zookeeper exception + */ + protected long getLogPositionFromZNode(String queueId, String filename, String znode) + throws ReplicationException { + byte[] bytes = null; + try { + bytes = ZKUtil.getData(this.zookeeper, znode); + } catch (KeeperException e) { + throw new ReplicationException("Internal Error: could not get position in log for queueId=" + + queueId + ", filename=" + filename + ", znode=" + znode, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return 0; + } + try { + return ZKUtil.parseWALPositionFrom(bytes); + } catch (DeserializationException de) { + LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename + + " znode content, continuing."); + } + // if we can not parse the position, start at the beginning of the wal file + // again + return 0; + } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java index 0a8ed31..d9d6e0e 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableBasedReplicationQueuesClientImpl.java @@ -110,4 +110,10 @@ public class TableBasedReplicationQueuesClientImpl extends ReplicationTableBase // TODO throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); } + @Override + public long getLogPosition(String serverName, String queueId, String filename) + throws ReplicationException { + // TODO + throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java index 9d38026..492a2a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java @@ -257,7 +257,8 @@ public class DumpReplicationQueues extends Configured implements Tool { StringBuilder sb = new StringBuilder(); if (!deletedQueues.isEmpty()) { sb.append("Found " + deletedQueues.size() + " deleted queues" - + ", run hbck -fixReplication in order to remove the deleted replication queues\n"); + + ", run hbck -fixReplicationUndeletedQueues in order to" + + " remove the deleted replication queues\n"); for (String deletedQueue : deletedQueues) { sb.append(" " + deletedQueue + "\n"); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java index cabf85a..2d9e6f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; @@ -89,9 +90,34 @@ public class RecoveredReplicationSource extends ReplicationSource { } public void locateRecoveredPaths(PriorityBlockingQueue queue) throws IOException { - boolean hasPathChanged = false; PriorityBlockingQueue newPaths = new PriorityBlockingQueue(queueSizePerGroup, new LogsComparator()); + boolean hasPathChanged = locateNewPaths(queue, newPaths, fs, server, replicationQueueInfo, + conf, manager.getLogDir()); + + if (hasPathChanged) { + if (newPaths.size() != queue.size()) { // this shouldn't happen + String errorMsg = String.format( + "Recovery queue size is incorrect, expected %s but got %s after locating new wal paths", + queue.size(), newPaths.size()); + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + // put the correct locations in the queue + // since this is a recovered queue with no new incoming logs, + // there shouldn't be any concurrency issues + queue.clear(); + for (Path path : newPaths) { + queue.add(path); + } + } + } + + private static boolean locateNewPaths(PriorityBlockingQueue queue, + PriorityBlockingQueue newPaths, FileSystem fs, Server server, + ReplicationQueueInfo replicationQueueInfo, Configuration conf, Path logDir) + throws IOException { + boolean hasPathChanged = false; pathsLoop: for (Path path : queue) { if (fs.exists(path)) { // still in same location, don't need to do anything newPaths.add(path); @@ -102,14 +128,14 @@ public class RecoveredReplicationSource extends ReplicationSource { if (server instanceof ReplicationSyncUp.DummyServer) { // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists - Path newPath = getReplSyncUpPath(path); + Path newPath = getReplSyncUpPath(path, fs, logDir); newPaths.add(newPath); continue; } else { // See if Path exists in the dead RS folder (there could be a chain of failures // to look at) - List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); - LOG.info("NB dead servers : " + deadRegionServers.size()); + List deadRegionServers = replicationQueueInfo.getDeadRegionServers(); + LOG.debug("NB dead servers : " + deadRegionServers.size()); final Path walDir = FSUtils.getWALRootDir(conf); for (ServerName curDeadServerName : deadRegionServers) { final Path deadRsDirectory = @@ -119,7 +145,7 @@ public class RecoveredReplicationSource extends ReplicationSource { deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) }; for (Path possibleLogLocation : locs) { LOG.info("Possible location " + possibleLogLocation.toUri().toString()); - if (manager.getFs().exists(possibleLogLocation)) { + if (fs.exists(possibleLogLocation)) { // We found the right new location LOG.info("Log " + path + " still exists at " + possibleLogLocation); newPaths.add(possibleLogLocation); @@ -133,26 +159,37 @@ public class RecoveredReplicationSource extends ReplicationSource { newPaths.add(path); } } + return hasPathChanged; + } - if (hasPathChanged) { - if (newPaths.size() != queue.size()) { // this shouldn't happen - LOG.error("Recovery queue size is incorrect"); - throw new IOException("Recovery queue size error"); - } - // put the correct locations in the queue - // since this is a recovered queue with no new incoming logs, - // there shouldn't be any concurrency issues - queue.clear(); - for (Path path : newPaths) { - queue.add(path); + public static Path locateLogInDeadRSDir(String logName, ReplicationQueueInfo replicationQueueInfo, + Configuration conf, FileSystem fs) throws IOException { + // See if Path exists in the dead RS folder (there could be a chain of failures + // to look at) + List deadRegionServers = replicationQueueInfo.getDeadRegionServers(); + LOG.info("NB dead servers : " + deadRegionServers.size()); + final Path walDir = FSUtils.getWALRootDir(conf); + for (ServerName curDeadServerName : deadRegionServers) { + final Path deadRsDirectory = new Path(walDir, + AbstractFSWALProvider.getWALDirectoryName(curDeadServerName.getServerName())); + Path[] locs = new Path[] { new Path(deadRsDirectory, logName), new Path( + deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), logName) }; + for (Path possibleLogLocation : locs) { + LOG.info("Possible location " + possibleLogLocation.toUri().toString()); + if (fs.exists(possibleLogLocation)) { + // We found the right new location + LOG.info("Log " + logName + " still exists at " + possibleLogLocation); + return possibleLogLocation; + } } } + return null; } // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal // area rather than to the wal area for a particular region server. - private Path getReplSyncUpPath(Path path) throws IOException { - FileStatus[] rss = fs.listStatus(manager.getLogDir()); + private static Path getReplSyncUpPath(Path path, FileSystem fs, Path logDir) throws IOException { + FileStatus[] rss = fs.listStatus(logDir); for (FileStatus rs : rss) { Path p = rs.getPath(); FileStatus[] logs = fs.listStatus(p); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index ea6c6d4..b2f6148 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.RSRpcServices; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index 934a630..825b562 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -258,7 +258,16 @@ public class HBaseFsck extends Configured implements Closeable { private boolean fixReferenceFiles = false; // fix lingering reference store file private boolean fixHFileLinks = false; // fix lingering HFileLinks private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows - private boolean fixReplication = false; // fix undeleted replication queues for removed peer + //fix undeleted replication queues for removed peer + private boolean fixReplicationUndeletedQueues = false; + //try reading the first log in every replication queue + private boolean checkCurrentReplicationWAL = false; + //try reading all logs in all replication queues + private boolean checkAllReplicationWAL = false; + //remove WALs from replication queues which are not in HDFS + private boolean fixMissingReplicationWAL = false; + //remove corrupted WALs (except current) from replication queues, quarantine them + private boolean fixCorruptedReplicationWAL = false; private boolean fixAny = false; // Set to true if any of the fix is required. // limit checking/fixes to listed tables, if empty attempt to check/fix all @@ -3573,10 +3582,24 @@ public class HBaseFsck extends Configured implements Closeable { ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, connection, errors); checker.checkUnDeletedQueues(); - if (checker.hasUnDeletedQueues() && this.fixReplication) { + if (checker.hasUnDeletedQueues() && this.fixReplicationUndeletedQueues) { checker.fixUnDeletedQueues(); setShouldRerun(); } + + if (this.checkCurrentReplicationWAL) { + checker.checkCurrentReplicationWAL(); + } else if (this.checkAllReplicationWAL) { + checker.checkAllReplicationWAL(); + } + + if (this.fixMissingReplicationWAL) { + checker.fixMissingReplicationWAL(); + } + + if (this.fixCorruptedReplicationWAL) { + checker.fixCorruptedReplicationWAL(); + } } /** @@ -4109,7 +4132,8 @@ public class HBaseFsck extends Configured implements Closeable { HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION, ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE, LINGERING_HFILELINK, WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK, BOUNDARIES_ERROR, - ORPHAN_TABLE_STATE, NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE, DUPE_ENDKEYS + ORPHAN_TABLE_STATE, NO_TABLE_STATE, UNDELETED_REPLICATION_QUEUE, DUPE_ENDKEYS, + REPLICATION_CHECK_CURRENT_WAL, REPLICATION_CHECK_ALL_WAL } void clear(); void report(String message); @@ -4536,8 +4560,26 @@ public class HBaseFsck extends Configured implements Closeable { /** * Set replication fix mode. */ - public void setFixReplication(boolean shouldFix) { - fixReplication = shouldFix; + public void setFixReplicationUndeletedQueues(boolean shouldFix) { + fixReplicationUndeletedQueues = shouldFix; + fixAny |= shouldFix; + } + + public void setCheckCurrentReplicationWAL() { + checkCurrentReplicationWAL = true; + } + + public void setCheckAllReplicationWAL() { + checkAllReplicationWAL = true; + } + + public void setFixMissingReplicationWAL(boolean shouldFix) { + fixMissingReplicationWAL = true; + fixAny |= shouldFix; + } + + public void setFixCorruptedReplicationWAL(boolean shouldFix) { + fixCorruptedReplicationWAL = true; fixAny |= shouldFix; } @@ -4818,7 +4860,17 @@ public class HBaseFsck extends Configured implements Closeable { out.println(""); out.println(" Replication options"); - out.println(" -fixReplication Deletes replication queues for removed peers"); + out.println(" -fixReplicationUndeletedQueues Deletes replication queues for removed peers." + + " Replaces the old --fixReplication"); + out.println(" -checkCurrentReplicationWAL Tries to read an entry from the oldest " + + "(currently replicated) WAL in each replication queue, at the current position"); + out.println(" -checkAllReplicationWAL Tries to completely read through all WALs " + + "in all replication queues"); + out.println(" -fixMissingReplicationWAL Removes WALs from replication queues which " + + "are not present on HDFS"); + out.println(" -fixCorruptedReplicationWAL Removes corrupted WALs from the replication " + + "queues and moves them to the corrupted dir. " + + "If the current WAL is corrupt, it is rolled first"); out.flush(); errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString()); @@ -5005,8 +5057,16 @@ public class HBaseFsck extends Configured implements Closeable { setCheckMetaOnly(); } else if (cmd.equals("-boundaries")) { setRegionBoundariesCheck(); - } else if (cmd.equals("-fixReplication")) { - setFixReplication(true); + } else if (cmd.equals("-fixReplicationUndeletedQueues")) { + setFixReplicationUndeletedQueues(true); + } else if (cmd.equals("-checkCurrentReplicationWAL")) { + setCheckCurrentReplicationWAL(); + } else if (cmd.equals("-checkAllReplicationWAL")) { + setCheckAllReplicationWAL(); + } else if (cmd.equals("-fixMissingReplicationWAL")) { + setFixMissingReplicationWAL(true); + } else if (cmd.equals("-fixCorruptedReplicationWAL")) { + setFixCorruptedReplicationWAL(true); } else if (cmd.startsWith("-")) { errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd); return printUsageAndExit(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java index 9fb8459..68937f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -18,7 +18,11 @@ package org.apache.hadoop.hbase.util.hbck; +import com.google.common.base.Throwables; + import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -26,31 +30,84 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner; +import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HBaseFsck; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; +import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WAL.Reader; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; /* * Check and fix undeleted replication queues for removed peerId. */ @InterfaceAudience.Private public class ReplicationChecker { + private static final Log LOG = LogFactory.getLog(ReplicationChecker.class.getName()); + private static final int DEFAULT_MAX_QUEUE_ACTION_ATTEMPTS = 10; + private static final int DEFAULT_QUEUE_ACTION_ATTEMPT_SLEEP_INTERVAL = 1000; + private static final int DEFAULT_QUEUE_ACTION_ATTEMPT_MAX_SLEEP_TIME = 10000; + private final ErrorReporter errorReporter; // replicator with its queueIds for removed peers private Map> undeletedQueueIds = new HashMap<>(); // replicator with its undeleted queueIds for removed peers in hfile-refs queue private Set undeletedHFileRefsQueueIds = new HashSet<>(); private final ReplicationZKNodeCleaner cleaner; + private ReplicationQueuesClient queuesClient; + private Configuration conf; + private FileSystem fs; + private ClusterConnection connection; + private ZooKeeperWatcher zkw; + private RetryCounterFactory retryFactory; public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, ClusterConnection connection, ErrorReporter errorReporter) throws IOException { + this.conf = conf; + this.zkw = zkw; + this.connection = connection; this.cleaner = new ReplicationZKNodeCleaner(conf, zkw, connection); this.errorReporter = errorReporter; + try { + this.queuesClient = ReplicationFactory + .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, connection, zkw)); + this.queuesClient.init(); + } catch (Exception e) { + throw new IOException("failed to construct ReplicationZKNodeCleaner", e); + } + fs = FileSystem.get(conf); + this.retryFactory = new RetryCounterFactory( + conf.getInt("hbase.hbck.replication.checker.logroll.attempts", + DEFAULT_MAX_QUEUE_ACTION_ATTEMPTS), + conf.getInt( + "hbase.hbck.replication.checker.logroll.interval", + DEFAULT_QUEUE_ACTION_ATTEMPT_SLEEP_INTERVAL), + conf.getInt( + "hbase.hbck.replication.checker.logroll.maxsleeptime", + DEFAULT_QUEUE_ACTION_ATTEMPT_MAX_SLEEP_TIME)); } public boolean hasUnDeletedQueues() { @@ -58,6 +115,78 @@ public class ReplicationChecker { HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE); } + public void checkCurrentReplicationWAL() throws IOException { + LOG.info("Starting checkCurrentReplicationWAL"); + checkReplicationWALs(true); + LOG.info("Finished checkCurrentReplicationWAL"); + } + + public void checkAllReplicationWAL() throws IOException { + LOG.info("Starting checkAllReplicationWAL"); + checkReplicationWALs(false); + LOG.info("Finished checkAllReplicationWAL"); + } + + public void fixMissingReplicationWAL() throws IOException { + LOG.info("Starting fixMissingReplicationWAL"); + ReplicationWALQueueVisitor fixMissingWALVisitor = new ReplicationWALQueueVisitor() { + @Override + public void visit(String serverName, String queueId, Path walDir, String logName, + int logQueueIndex) throws ReplicationException, IOException, InterruptedException { + Path locatedWal = locateWal(new Path(walDir, logName), queueId); + if (locatedWal == null) { + LOG.warn(String.format( + "Replication wal couldn't be found in HDFS. Removing from queue. " + + "server=%s queueId=%s logName=%s", + serverName, queueId, logName)); + removeLogFromQueue(serverName, queueId, logName, locatedWal, logQueueIndex); + } + } + }; + ReplicationWALQueueIterator it = new ReplicationWALQueueIterator(fixMissingWALVisitor, false); + it.iterate(); + LOG.info("Finished fixMissingReplicationWAL"); + } + + public void fixCorruptedReplicationWAL() throws IOException { + LOG.info("Starting fixCorruptedReplicationWAL"); + Set corruptedPaths = new HashSet<>(); + ReplicationWALQueueVisitor fixCorruptedWALVisitor = new ReplicationWALQueueVisitor() { + // multiple peers means the same log could be in different queues, we only need to check each + // unique path once + // map to false if the path is corrupt + Map checkedPaths = new HashMap<>(); + + @Override + public void visit(String serverName, String queueId, Path walDir, String logName, + int logQueueIndex) throws ReplicationException, IOException, InterruptedException { + long logPosition = queuesClient.getLogPosition(serverName, queueId, logName); + Path path = new Path(walDir, logName); + path = locateWal(path, queueId); + if (path == null) { + errorReporter.reportError(ERROR_CODE.REPLICATION_CHECK_ALL_WAL, + String.format("Could not find replication wal in hdfs: server=%s queue=%s log=%s ", + serverName, queueId, logName)); + return; + } + Boolean isGoodWal = checkedPaths.get(path); + if (isGoodWal == null) { + isGoodWal = readWal(false, fs, queueId, logPosition, path, serverName); + checkedPaths.put(path, isGoodWal); + } + if (!isGoodWal) { + corruptedPaths.add(path); + removeLogFromQueue(serverName, queueId, logName, path, logQueueIndex); + } + } + }; + ReplicationWALQueueIterator it = new ReplicationWALQueueIterator(fixCorruptedWALVisitor, false); + it.iterate(); + + WALSplitter.quarantineCorruptedLogs(new ArrayList<>(corruptedPaths), fs, conf); + LOG.info("Finished fixCorruptedReplicationWAL"); + } + public void checkUnDeletedQueues() throws IOException { undeletedQueueIds = cleaner.getUnDeletedQueues(); for (Entry> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { @@ -92,9 +221,180 @@ public class ReplicationChecker { fixUnDeletedHFileRefsQueue(); } + private void removeLogFromQueue(String serverName, String queueId, String logName, Path path, + int logQueueIndex) throws FailedLogCloseException, IOException, InterruptedException { + try { + List logsInQueue = getSortedLogsInQueue(serverName, queueId); + // if the current WAL (being written to) is bad, roll it first + if (logQueueIndex == (logsInQueue.size() - 1)) { + connection.getAdmin().rollWALWriter(ServerName.valueOf(serverName)); + RetryCounter retrier = retryFactory.create(); + boolean hasLogRolled = false; + // wait for log to roll + while (retrier.shouldRetry()) { + logsInQueue = getSortedLogsInQueue(serverName, queueId); + // current WAL being written to is last in queue + String currentLog = logsInQueue.get(logsInQueue.size() - 1); + hasLogRolled = !logName.equals(currentLog); + if (hasLogRolled) { + LOG.info(String.format("Log rolled from %s to %s", logName, currentLog)); + hasLogRolled = true; + break; + } + retrier.sleepUntilNextRetry(); + } + if (!hasLogRolled) { + throw new IOException("Timed out while waiting to roll log: " + path); + } + } + } catch (KeeperException e) { + throw new IOException(e); + } + try { + ReplicationQueues replicationQueues = ReplicationFactory + .getReplicationQueues(new ReplicationQueuesArguments(conf, connection, zkw)); + replicationQueues.init(serverName); + replicationQueues.removeLog(queueId, logName); + LOG.info(String.format("Removed log from replication queue. server=%s queueId=%s logName=%s", + serverName, queueId, logName)); + } catch (Exception e) { + LOG.error("Error while trying to remove log: " + logName, e); + } + } + + private List getSortedLogsInQueue(String serverName, String queueId) + throws KeeperException { + List logsInQueue = queuesClient.getLogsInQueue(serverName, queueId); + Collections.sort(logsInQueue); + return logsInQueue; + } + private void fixUnDeletedHFileRefsQueue() throws IOException { if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) { cleaner.removeHFileRefsQueues(undeletedHFileRefsQueueIds); } } + + // tries to locate the given wal in either the archive directory or dead RS dir (for recovered + // queues) + private Path locateWal(Path path, String queueId) throws IOException { + if (!fs.exists(path)) { + // check if log was archived + path = AbstractFSWALProvider.getArchivedLogPath(path, conf); + if (!fs.exists(path)) { + // check dead RS dirs + return RecoveredReplicationSource.locateLogInDeadRSDir(path.getName(), + new ReplicationQueueInfo(queueId), conf, fs); + } + } + return path; + } + + private void checkReplicationWALs(boolean onlyFirstWAL) throws IOException { + ReplicationWALQueueVisitor visitor = new ReplicationWALQueueVisitor() { + // multiple peers means the same log could be in different queues, we only need to check each + // unique path once + Set checkedPaths = new HashSet<>(); + + @Override + public void visit(String serverName, String queueId, Path walDir, String logName, + int logQueueIndex) throws ReplicationException, IOException { + long logPosition = queuesClient.getLogPosition(serverName, queueId, logName); + Path path = new Path(walDir, logName); + path = locateWal(path, queueId); + if (path == null) { + errorReporter.reportError( + onlyFirstWAL ? ERROR_CODE.REPLICATION_CHECK_CURRENT_WAL + : ERROR_CODE.REPLICATION_CHECK_ALL_WAL, + String.format("Could not find replication wal in hdfs: server=%s queue=%s log=%s ", + serverName, queueId, logName)); + return; + } + if (checkedPaths.contains(path)) { + return; + } + readWal(onlyFirstWAL, fs, queueId, logPosition, path, serverName); + checkedPaths.add(path); + } + }; + ReplicationWALQueueIterator it = new ReplicationWALQueueIterator(visitor, onlyFirstWAL); + it.iterate(); + } + + // returns true if WAL was able to be read successfully + private boolean readWal(boolean onlyFirstEntry, final FileSystem fs, String queueId, + long logPosition, Path path, String serverName) { + Reader reader; + LOG.info(String.format("Attempting to read replication wal, server=%s queue=%s log=%s", + serverName, queueId, path)); + try { + reader = WALFactory.createReader(fs, path, conf); + if (logPosition != 0) { + reader.seek(logPosition); + } + int entryCount = 0; + while (reader.next() != null) { + entryCount++; + if (onlyFirstEntry) { + break; + } + } + LOG.info(String.format("Successfully read %s entries from %s", entryCount, path)); + } catch (IOException e) { + String message = onlyFirstEntry ? "checkCurrentReplicationWAL" + : "checkAllReplicationWAL" + " error while reading replication wal: " + path.getName(); + message += " Stacktrace: " + Throwables.getStackTraceAsString(e); + errorReporter.reportError(onlyFirstEntry ? ERROR_CODE.REPLICATION_CHECK_CURRENT_WAL + : ERROR_CODE.REPLICATION_CHECK_ALL_WAL, + message); + return false; + } + return true; + } + + // iterates through WALs in replication queues, calling the given visitor with each WAL + private class ReplicationWALQueueIterator { + + private ReplicationWALQueueVisitor visitor; + private boolean onlyFirstWAL; + + /** + * @param onlyFirstWAL true if only the first WAL in each queue should be visited + */ + public ReplicationWALQueueIterator(ReplicationWALQueueVisitor visitor, boolean onlyFirstWAL) { + this.visitor = visitor; + this.onlyFirstWAL = onlyFirstWAL; + } + + public void iterate() throws IOException { + final Path walRootDir = FSUtils.getWALRootDir(conf); + try { + List listOfReplicators = queuesClient.getListOfReplicators(); + for (String serverName : listOfReplicators) { + List serverQueues = queuesClient.getAllQueues(serverName); + Path walDir = new Path(walRootDir, AbstractFSWALProvider.getWALDirectoryName(serverName)); + for (String queueId : serverQueues) { + List logsInQueue = getSortedLogsInQueue(serverName, queueId); + if (logsInQueue == null) { + continue; + } + for (int i = 0; i < logsInQueue.size(); i++) { + String currentLog = logsInQueue.get(i); + visitor.visit(serverName, queueId, walDir, currentLog, i); + if (i == 0 && onlyFirstWAL) { + break; + } + } + } + } + } catch (KeeperException | ReplicationException | InterruptedException e) { + throw new IOException("Error accessing replication queues during replication WAL check", e); + } + } + } + + private static interface ReplicationWALQueueVisitor { + public void visit(String serverName, String queueId, Path walDir, String logName, + int logQueueIndex) throws ReplicationException, IOException, InterruptedException; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 215d2ed..007b418 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -473,6 +473,30 @@ public class WALSplitter { final List corruptedLogs, final List processedLogs, final Path oldLogDir, final FileSystem fs, final Configuration conf) throws IOException { + quarantineCorruptedLogs(corruptedLogs, fs, conf); + fs.mkdirs(oldLogDir); + + for (Path p : processedLogs) { + Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p); + if (fs.exists(p)) { + if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) { + LOG.warn("Unable to move " + p + " to " + newPath); + } else { + LOG.info("Archived processed log " + p + " to " + newPath); + } + } + } + } + + /** + * Moves corrupted logs to corruptDir (.corrupt) + * @param corruptedLogs Logs to move to the corruptDir + * @param fs FileSystem to use + * @param conf config + * @throws IOException thrown if there are FS errors + */ + public static void quarantineCorruptedLogs(final List corruptedLogs, + final FileSystem fs, final Configuration conf) throws IOException { final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to " @@ -481,7 +505,6 @@ public class WALSplitter { if (!fs.mkdirs(corruptDir)) { LOG.info("Unable to mkdir " + corruptDir); } - fs.mkdirs(oldLogDir); // this method can get restarted or called multiple times for archiving // the same log files. @@ -495,17 +518,6 @@ public class WALSplitter { } } } - - for (Path p : processedLogs) { - Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p); - if (fs.exists(p)) { - if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) { - LOG.warn("Unable to move " + p + " to " + newPath); - } else { - LOG.info("Archived processed log " + p + " to " + newPath); - } - } - } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java new file mode 100644 index 0000000..649f163 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ MiscTests.class, MediumTests.class }) +public class TestHBaseFsckReplication extends BaseTestHBaseFsck { + @Rule + public TestName name = new TestName(); + + private static MiniHBaseCluster hbaseCluster; + + private static HRegionServer rs; + + private final static String testQueueId = "1"; + + @BeforeClass + public static void setup() throws Exception { + // make sure HMaster queue cleaner chore doesn't run + conf.setInt("hbase.master.cleaner.interval", 10 * 60 * 1000); + TestHBaseFsckOneRS.setUpBeforeClass(); + hbaseCluster = TEST_UTIL.getHBaseCluster(); + rs = hbaseCluster.getRegionServer(0); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TestHBaseFsckOneRS.tearDownAfterClass(); + } + + @Before + public void setUp() { + EnvironmentEdgeManager.reset(); + } + + /** + * Tests that reading from a good replication log passes checkCurrentReplicationWAL, and reading + * from a bad log fails checkCurrentReplicationWAL + */ + @Test + public void testCheckCurrentReplicationWAL() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName()); + ReplicationQueues replQueues = setupReplicationQueue(); + try (HBaseFsck hbck = new HBaseFsck(conf)) { + setupTable(tableName); + + // get the current WAL and add to the replication queue + Path currentWalPath = addCurrentWALToQueue(tableName, replQueues, testQueueId); + + // write some data to WAL + Put put = new Put(ROWKEYS[0]); + put.addColumn(FAM, ROWKEYS[0], ROWKEYS[0]); + tbl.put(put); + + // hbck checkCurrentReplicationWAL should pass + hbck.connect(); + hbck.setCheckCurrentReplicationWAL(); + hbck.onlineHbck(); + + ArrayList errorList = assertHbckNoErrors(hbck); + + // Test a bad wal - create an empty (0-length) WAL + String nextWal = getNextWalName(currentWalPath); + createEmptyWal(nextWal); + // put empty log on queue + replQueues.removeLog(testQueueId, currentWalPath.getName()); + replQueues.addLog(testQueueId, nextWal); + + // Now the checkCurrentReplicationWAL should fail + hbck.onlineHbck(); + errorList = hbck.getErrors().getErrorList(); + assertEquals(2, errorList.size()); + assertEquals(ERROR_CODE.REPLICATION_CHECK_CURRENT_WAL, errorList.get(1)); + } finally { + cleanupTable(tableName); + replQueues.removeAllQueues(); + } + } + + /** + * Tests that a bad log at the end of the replication queue results in an checkAllReplcationWAL + * error + */ + @Test + public void testCheckAllReplicationWAL() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName()); + ReplicationQueues replQueues = setupReplicationQueue(); + try (HBaseFsck hbck = new HBaseFsck(conf)) { + setupTable(tableName); + + // get the current WAL and add to the replication queue + Path currentWalPath = addCurrentWALToQueue(tableName, replQueues, testQueueId); + + // write some data to WAL + Put put = new Put(ROWKEYS[0]); + put.addColumn(FAM, ROWKEYS[0], ROWKEYS[0]); + tbl.put(put); + + // hbck checkAllReplicationWAL should pass + hbck.connect(); + hbck.setCheckAllReplicationWAL(); + hbck.onlineHbck(); + + ArrayList errorList; + assertHbckNoErrors(hbck); + + // Test a bad wal - create an empty (0-length) WAL + String fileName = getNextWalName(currentWalPath); + createEmptyWal(fileName); + // put bad log on queue, after the good log + replQueues.addLog(testQueueId, fileName); + + // Now the checkAllReplicationWAL should fail + hbck.onlineHbck(); + errorList = hbck.getErrors().getErrorList(); + assertEquals(2, errorList.size()); + assertEquals(ERROR_CODE.REPLICATION_CHECK_ALL_WAL, errorList.get(1)); + } finally { + cleanupTable(tableName); + replQueues.removeAllQueues(); + } + } + + /** + * Tests that a WAL that doesn't exist in HDFS is removed + */ + @Test + public void testFixMissingReplicationWAL() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName()); + ReplicationQueues replQueues = setupReplicationQueue(); + + try (HBaseFsck hbck = new HBaseFsck(conf)) { + setupTable(tableName); + + // get the current WAL and add to the replication queue + Path currentWAL = addCurrentWALToQueue(tableName, replQueues, testQueueId); + + // fix should not remove the WAL + hbck.connect(); + hbck.setFixMissingReplicationWAL(true); + hbck.onlineHbck(); + + List logsInQueue = replQueues.getLogsInQueue(testQueueId); + assertEquals(1, logsInQueue.size()); + + // add a bad wal that doesn't exist in HDFS to head of queue + String walPrefixFromWALName = + AbstractFSWALProvider.getWALPrefixFromWALName(currentWAL.getName()); + String badWal = walPrefixFromWALName + ".0"; + replQueues.addLog(testQueueId, badWal); + assertEquals(2, replQueues.getLogsInQueue(testQueueId).size()); + + // fix should remove the bad wal + hbck.onlineHbck(); + logsInQueue = replQueues.getLogsInQueue(testQueueId); + assertEquals(1, logsInQueue.size()); + assertEquals(currentWAL.getName(), logsInQueue.get(0)); + } finally { + cleanupTable(tableName); + replQueues.removeAllQueues(); + } + } + + /** + * Tests that a corrupt WAL is removed from the replication queue, and moved to .corrupt dir + */ + @Test + public void testFixCorruptedReplicationWAL() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName()); + ReplicationQueues replQueues = setupReplicationQueue(); + + try (HBaseFsck hbck = new HBaseFsck(conf)) { + setupTable(tableName); + + // get the current WAL and add to the replication queue + Path currentWAL = addCurrentWALToQueue(tableName, replQueues, testQueueId); + + // fix should not remove the WAL + hbck.connect(); + hbck.setFixCorruptedReplicationWAL(true); + hbck.onlineHbck(); + List logsInQueue = replQueues.getLogsInQueue(testQueueId); + assertEquals(1, logsInQueue.size()); + + // Test a bad wal - create an empty (0-length) WAL that comes before the good wal + String walPrefixFromWALName = + AbstractFSWALProvider.getWALPrefixFromWALName(currentWAL.getName()); + String badWal = walPrefixFromWALName + ".0"; + createEmptyWal(badWal); + // put bad log on queue, before the good log + replQueues.addLog(testQueueId, badWal); + + // fix should remove the bad wal and quarantine it + hbck.onlineHbck(); + logsInQueue = replQueues.getLogsInQueue(testQueueId); + assertEquals(1, logsInQueue.size()); + assertEquals(currentWAL.getName(), logsInQueue.get(0)); + final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); + assertEquals(true, + TEST_UTIL.getDFSCluster().getFileSystem().exists(new Path(corruptDir, badWal))); + } finally { + cleanupTable(tableName); + replQueues.removeAllQueues(); + } + } + + /** + * Tests that if the current WAL being written (end of queue) to is corrupt, and we run + * fixCorruptedReplicationWAL, the WAL is rolled and the corrupt WAL is quarantined + */ + @Test + public void testCurrentCorruptWALRoll() throws Exception { + final TableName tableName = TableName.valueOf(name.getMethodName()); + ReplicationQueues replQueues = setupReplicationQueue(); + + List addedWals = Lists.newArrayList(); + + try (HBaseFsck hbck = new HBaseFsck(conf)) { + setupTable(tableName); + // register a wal roll listener to simulate wal getting added to replication queue after a + // roll + RegionInfo regionInfo = hbaseCluster.getRegions(tableName).get(0).getRegionInfo(); + rs.getWAL(regionInfo).registerWALActionsListener(new WALActionsListener.Base() { + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException { + try { + LOG.info("Adding log to replication queue: " + newPath); + replQueues.addLog(testQueueId, newPath.getName()); + addedWals.add(newPath); + } catch (ReplicationException e) { + LOG.error("Error adding log to queue", e); + } + } + }); + + // get the current WAL and add to the replication queue + Path currentWAL = addCurrentWALToQueue(tableName, replQueues, testQueueId); + + // Test a bad wal - create an empty (0-length) WAL + // ensure it comes at the end of the queue, making it the current WAL + String badWal = getNextWalName(currentWAL); + createEmptyWal(badWal); + // put bad log on queue + replQueues.addLog(testQueueId, badWal); + + // fix should roll the wal, remove the bad wal and quarantine it + hbck.connect(); + hbck.setFixCorruptedReplicationWAL(true); + hbck.onlineHbck(); + List logsInQueue = replQueues.getLogsInQueue(testQueueId); + Collections.sort(logsInQueue); + assertEquals(1, addedWals.size()); // new wal should've been added + assertEquals(2, logsInQueue.size()); // original good wal, plus the newly rolled wal + assertEquals(currentWAL.getName(), logsInQueue.get(0)); + assertEquals(addedWals.get(0).getName(), logsInQueue.get(1)); + final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); + assertEquals(true, + TEST_UTIL.getDFSCluster().getFileSystem().exists(new Path(corruptDir, badWal))); + } finally { + cleanupTable(tableName); + replQueues.removeAllQueues(); + } + } + + // Get a wal name that follows sequentially after the current wal + private String getNextWalName(Path currentWalPath) { + // wals are timestamped, current time will be the latest + String walPrefixFromWALName = + AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); + return walPrefixFromWALName + AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER + + EnvironmentEdgeManager.currentTime(); + } + + private void createEmptyWal(String fileName) throws IOException { + final Path walDir = new Path(FSUtils.getWALRootDir(conf), + AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().toString())); + Path emptyWalPath = new Path(walDir, fileName); + TEST_UTIL.getTestFileSystem().create(emptyWalPath).close(); + } + + private ArrayList assertHbckNoErrors(HBaseFsck hbck) { + // there will be 1 unrelated error because we created a test queue without an actual peer + ArrayList errorList = hbck.getErrors().getErrorList(); + assertEquals(1, errorList.size()); + assertEquals(ERROR_CODE.UNDELETED_REPLICATION_QUEUE, errorList.get(0)); + return errorList; + } + + private Path addCurrentWALToQueue(final TableName tableName, ReplicationQueues replQueues, + String queueId) throws IOException, ReplicationException { + RegionInfo regionInfo = hbaseCluster.getRegions(tableName).get(0).getRegionInfo(); + WAL wal = hbaseCluster.getRegionServer(0).getWAL(regionInfo); + Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); + replQueues.addLog(queueId, currentWalPath.getName()); + return currentWalPath; + } + + private ReplicationQueues setupReplicationQueue() + throws Exception, IOException, ReplicationException { + ReplicationQueues replQueues = ReplicationFactory.getReplicationQueues( + new ReplicationQueuesArguments(conf, rs, TEST_UTIL.getZooKeeperWatcher())); + replQueues.init(rs.getServerName().toString()); + return replQueues; + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java index f83e5a8..39c8e6f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java @@ -46,7 +46,7 @@ public class HbckTestingUtil { public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans, boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, boolean fixHFileLinks, - boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, Boolean fixReplication, + boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, Boolean fixReplicationUndeletedQueues, TableName table) throws Exception { HBaseFsck fsck = new HBaseFsck(conf, exec); try { @@ -62,7 +62,7 @@ public class HbckTestingUtil { fsck.setFixReferenceFiles(fixReferenceFiles); fsck.setFixHFileLinks(fixHFileLinks); fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo); - fsck.setFixReplication(fixReplication); + fsck.setFixReplicationUndeletedQueues(fixReplicationUndeletedQueues); if (table != null) { fsck.includeTable(table); } -- 2.7.4