diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2285292..9e60b43 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.ipc.RemoteException; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; @@ -829,6 +830,81 @@ public class ReplicationSource extends Thread return this.currentPath != null; } + private boolean handleFileNotFound(FileNotFoundException fnfe, int sleepMultiplier) + throws IOException { + if (this.replicationQueueInfo.isQueueRecovered()) { + // We didn't find the log in the archive directory, look if it still + // exists in the dead RS folder (there could be a chain of failures + // to look at) + List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); + LOG.info("NB dead servers : " + deadRegionServers.size()); + final Path rootDir = FSUtils.getRootDir(conf); + for (String curDeadServerName : deadRegionServers) { + final Path deadRsDirectory = new Path(rootDir, + DefaultWALProvider.getWALDirectoryName(curDeadServerName)); + Path[] locs = new Path[] { + new Path(deadRsDirectory, currentPath.getName()), + new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT), + currentPath.getName()), + }; + for (Path possibleLogLocation : locs) { + LOG.info("Possible location " + possibleLogLocation.toUri().toString()); + if (manager.getFs().exists(possibleLogLocation)) { + // We found the right new location + LOG.info("Log " + this.currentPath + " still exists at " + + possibleLogLocation); + // Breaking here will make us sleep since reader is null + // TODO why don't we need to set currentPath and call openReader here? + return true; + } + } + } + // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data + // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists + if (stopper instanceof ReplicationSyncUp.DummyServer) { + // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal + // area rather than to the wal area for a particular region server. + FileStatus[] rss = fs.listStatus(manager.getLogDir()); + for (FileStatus rs : rss) { + Path p = rs.getPath(); + FileStatus[] logs = fs.listStatus(p); + for (FileStatus log : logs) { + p = new Path(p, log.getPath().getName()); + if (p.getName().equals(currentPath.getName())) { + currentPath = p; + LOG.info("Log " + currentPath.getName() + " found at " + currentPath); + // Open the log at the new location + this.openReader(sleepMultiplier); + return true; + } + } + } + } + + // TODO What happens if the log was missing from every single location? + // Although we need to check a couple of times as the log could have + // been moved by the master between the checks + // It can also happen if a recovered queue wasn't properly cleaned, + // such that the znode pointing to a log exists but the log was + // deleted a long time ago. + // For the moment, we'll throw the IO and processEndOfFile + throw new IOException("File from recovered queue is " + + "nowhere to be found", fnfe); + } else { + // If the log was archived, continue reading from there + Path archivedLogLocation = + new Path(manager.getOldLogDir(), currentPath.getName()); + if (manager.getFs().exists(archivedLogLocation)) { + currentPath = archivedLogLocation; + LOG.info("Log " + this.currentPath + " was moved to " + + archivedLogLocation); + // Open the log at the new location + this.openReader(sleepMultiplier); + } + // TODO What happens the log is missing in both places? + } + return true; + } /** * Open a reader on the current path * @@ -843,78 +919,11 @@ public class ReplicationSource extends Thread } this.reader = repLogReader.openReader(this.currentPath); } catch (FileNotFoundException fnfe) { - if (this.replicationQueueInfo.isQueueRecovered()) { - // We didn't find the log in the archive directory, look if it still - // exists in the dead RS folder (there could be a chain of failures - // to look at) - List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); - LOG.info("NB dead servers : " + deadRegionServers.size()); - final Path rootDir = FSUtils.getRootDir(conf); - for (String curDeadServerName : deadRegionServers) { - final Path deadRsDirectory = new Path(rootDir, - DefaultWALProvider.getWALDirectoryName(curDeadServerName)); - Path[] locs = new Path[] { - new Path(deadRsDirectory, currentPath.getName()), - new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT), - currentPath.getName()), - }; - for (Path possibleLogLocation : locs) { - LOG.info("Possible location " + possibleLogLocation.toUri().toString()); - if (manager.getFs().exists(possibleLogLocation)) { - // We found the right new location - LOG.info("Log " + this.currentPath + " still exists at " + - possibleLogLocation); - // Breaking here will make us sleep since reader is null - // TODO why don't we need to set currentPath and call openReader here? - return true; - } - } - } - // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data - // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists - if (stopper instanceof ReplicationSyncUp.DummyServer) { - // N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal - // area rather than to the wal area for a particular region server. - FileStatus[] rss = fs.listStatus(manager.getLogDir()); - for (FileStatus rs : rss) { - Path p = rs.getPath(); - FileStatus[] logs = fs.listStatus(p); - for (FileStatus log : logs) { - p = new Path(p, log.getPath().getName()); - if (p.getName().equals(currentPath.getName())) { - currentPath = p; - LOG.info("Log " + currentPath.getName() + " found at " + currentPath); - // Open the log at the new location - this.openReader(sleepMultiplier); - return true; - } - } - } - } - - // TODO What happens if the log was missing from every single location? - // Although we need to check a couple of times as the log could have - // been moved by the master between the checks - // It can also happen if a recovered queue wasn't properly cleaned, - // such that the znode pointing to a log exists but the log was - // deleted a long time ago. - // For the moment, we'll throw the IO and processEndOfFile - throw new IOException("File from recovered queue is " + - "nowhere to be found", fnfe); - } else { - // If the log was archived, continue reading from there - Path archivedLogLocation = - new Path(manager.getOldLogDir(), currentPath.getName()); - if (manager.getFs().exists(archivedLogLocation)) { - currentPath = archivedLogLocation; - LOG.info("Log " + this.currentPath + " was moved to " + - archivedLogLocation); - // Open the log at the new location - this.openReader(sleepMultiplier); - - } - // TODO What happens the log is missing in both places? - } + return handleFileNotFound(fnfe, sleepMultiplier); + } catch (RemoteException re) { + IOException ioe = re.unwrapRemoteException(FileNotFoundException.class); + if (!(ioe instanceof FileNotFoundException)) throw ioe; + return handleFileNotFound((FileNotFoundException)ioe, sleepMultiplier); } } catch (LeaseNotRecoveredException lnre) { // HBASE-15019 the WAL was not closed due to some hiccup.