diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java index fd22780..5dbfa77 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java @@ -28,9 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupCopyService; import org.apache.hadoop.hbase.backup.BackupInfo; @@ -38,16 +36,13 @@ import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.backup.util.BackupServerUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.snapshot.ExportSnapshot; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.tools.DistCp; import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpOptions; -import org.apache.hadoop.util.ClassUtil; import org.apache.zookeeper.KeeperException.NoNodeException; /** * Copier for backup operation. Basically, there are 2 types of copy. One is copying from snapshot, @@ -123,7 +118,7 @@ public class MapReduceBackupCopyService implements BackupCopyService { } /** - * Update the ongoing back token znode with new progress. + * Update the ongoing backup with new progress. * @param backupContext backup context * * @param newProgress progress diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/IncrementalTableBackupProcedure.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/IncrementalTableBackupProcedure.java index 8b2f840..9bbc453 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/IncrementalTableBackupProcedure.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/IncrementalTableBackupProcedure.java @@ -27,11 +27,13 @@ import java.util.List; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupCopyService; import org.apache.hadoop.hbase.backup.BackupInfo; @@ -92,14 +94,26 @@ public class IncrementalTableBackupProcedure private List filterMissingFiles(List incrBackupFileList) throws IOException { FileSystem fs = FileSystem.get(conf); List list = new ArrayList(); - for(String file : incrBackupFileList){ - if(fs.exists(new Path(file))){ + for (String file : incrBackupFileList) { + if (fs.exists(new Path(file))) { + list.add(file); + } else { + LOG.warn("Can't find file: " + file); + } + } + return list; + } + + private List getMissingFiles(List incrBackupFileList) throws IOException { + FileSystem fs = FileSystem.get(conf); + List list = new ArrayList(); + for (String file : incrBackupFileList) { + if (!fs.exists(new Path(file))) { list.add(file); - } else{ - LOG.warn("Can't find file: "+file); } } return list; + } /** @@ -109,10 +123,8 @@ public class IncrementalTableBackupProcedure private void incrementalCopy(BackupInfo backupContext) throws Exception { LOG.info("Incremental copy is starting."); - // set overall backup phase: incremental_copy backupContext.setPhase(BackupPhase.INCREMENTAL_COPY); - // get incremental backup file list and prepare parms for DistCp List incrBackupFileList = backupContext.getIncrBackupFileList(); // filter missing files out (they have been copied by previous backups) @@ -121,18 +133,64 @@ public class IncrementalTableBackupProcedure strArr[strArr.length - 1] = backupContext.getHLogTargetDir(); BackupCopyService copyService = BackupRestoreServerFactory.getBackupCopyService(conf); - int res = copyService.copy(backupContext, backupManager, conf, - BackupCopyService.Type.INCREMENTAL, strArr); - - if (res != 0) { - LOG.error("Copy incremental log files failed with return code: " + res + "."); - throw new IOException("Failed of Hadoop Distributed Copy from " + incrBackupFileList + " to " + int counter = 0; + while (counter++ < 2) { + // We run DistCp maximum 2 times + // If it fails on a second time, we throw Exception + int res = copyService.copy(backupContext, backupManager, conf, + BackupCopyService.Type.INCREMENTAL, strArr); + + if (res != 0) { + LOG.error("Copy incremental log files failed with return code: " + res + "."); + throw new IOException("Failed of Hadoop Distributed Copy from "+ + StringUtils.join(incrBackupFileList, ",") +" to " + backupContext.getHLogTargetDir()); + } + List missingFiles = getMissingFiles(incrBackupFileList); + + if(missingFiles.isEmpty()) { + break; + } else { + // Repeat DistCp, some files have been moved from WALs to oldWALs during previous run + // update backupContext and strAttr + if(counter == 2){ + String msg = "DistCp could not finish the following files: " + + StringUtils.join(missingFiles, ","); + LOG.error(msg); + throw new IOException(msg); + } + List converted = convertFilesFromWALtoOldWAL(missingFiles); + incrBackupFileList.removeAll(missingFiles); + incrBackupFileList.addAll(converted); + backupContext.setIncrBackupFileList(incrBackupFileList); + + // Run DistCp only for missing files (which have been moved from WALs to oldWALs + // during previous run) + strArr = converted.toArray(new String[converted.size() + 1]); + strArr[strArr.length - 1] = backupContext.getHLogTargetDir(); + } } - LOG.info("Incremental copy from " + incrBackupFileList + " to " + + + LOG.info("Incremental copy from " + StringUtils.join(incrBackupFileList, ",") + " to " + backupContext.getHLogTargetDir() + " finished."); } + + private List convertFilesFromWALtoOldWAL(List missingFiles) throws IOException { + List list = new ArrayList(); + for(String path: missingFiles){ + if(path.indexOf(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME) < 0) { + LOG.error("Copy incremental log files failed, file is missing : " + path); + throw new IOException("Failed of Hadoop Distributed Copy to " + + backupContext.getHLogTargetDir()+", file is missing "+ path); + } + list.add(path.replace(Path.SEPARATOR + HConstants.HREGION_LOGDIR_NAME, + Path.SEPARATOR + HConstants.HREGION_OLDLOGDIR_NAME)); + } + return list; + } + @Override protected Flow executeFromState(final MasterProcedureEnv env, final IncrementalTableBackupState state)