diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java index 4d6b2a7..cbb9428 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/backup/BackupInfo.java @@ -310,6 +310,10 @@ public class BackupInfo implements Comparable { } public void setIncrBackupFileList(List incrBackupFileList) { + LOG.debug("setting incr backup file list"); + for (String file : incrBackupFileList) { + LOG.debug(file); + } this.incrBackupFileList = incrBackupFileList; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java index bd496ce..f7f9b36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java @@ -187,6 +187,8 @@ public class IncrementalBackupManager { } if (tss > oldTss && tss < newTss) { logFiles.add(item); + } else { + LOG.debug("skipping wal " + item); } } return logFiles; @@ -270,6 +272,9 @@ public class IncrementalBackupManager { // so newestTimestamps.get(host) will not be null. if (Long.valueOf(currentLogTS) > Long.valueOf(newestTimestamps.get(host))) { newestLogs.add(currentLogFile); + } else { + LOG.debug("not excluding " + currentLogFile + " " + currentLogTS + " <= " + + newestTimestamps.get(host)); } } } @@ -300,12 +305,15 @@ public class IncrementalBackupManager { if (oldTimeStamp == null) { if (Long.valueOf(currentLogTS) < Long.valueOf(savedStartCode)) { // This log file is really old, its region server was before our last backup. + LOG.debug("excluding old " + currentLogFile + " " + currentLogTS + " < " +savedStartCode); continue; } else { resultLogFiles.add(currentLogFile); } } else if (Long.valueOf(currentLogTS) > Long.valueOf(oldTimeStamp)) { resultLogFiles.add(currentLogFile); + } else { + LOG.debug("excluding old wal " + currentLogFile + " " + currentLogTS + " <= "+oldTimeStamp); } // It is possible that a host in .oldlogs is an obsolete region server @@ -314,6 +322,7 @@ public class IncrementalBackupManager { // to include they to avoid loss of edits for backup. Long newTimestamp = newestTimestamps.get(host); if (newTimestamp != null && Long.valueOf(currentLogTS) > Long.valueOf(newTimestamp)) { + LOG.debug("newest log " + currentLogFile); newestLogs.add(currentLogFile); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java index 5dbfa77..4ed01ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyService.java @@ -22,13 +22,16 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.math.BigDecimal; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.BackupCopyService; import org.apache.hadoop.hbase.backup.BackupInfo; @@ -38,6 +41,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.snapshot.ExportSnapshot; import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.tools.DistCp; @@ -239,7 +244,8 @@ public class MapReduceBackupCopyService implements BackupCopyService { new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP); String newProgressStr = progressData + "%"; - LOG.info("Progress: " + newProgressStr); + LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask + + " mapProgress: " + job.mapProgress()); // accumulate the overall backup progress progressDone = newProgress; @@ -249,9 +255,12 @@ public class MapReduceBackupCopyService implements BackupCopyService { bytesCopied); LOG.debug("Backup progress data updated to hbase:backup: \"Progress: " + newProgressStr + " - " + bytesCopied + " bytes copied.\""); - + } catch (Throwable t) { + LOG.debug("distcp " + job.getJobID() + " encountered error", t); + throw t; } finally { if (!fieldSubmitted.getBoolean(this)) { + LOG.debug("cleaning up " + job.getJobID()); methodCleanup.invoke(this); } } @@ -259,7 +268,13 @@ public class MapReduceBackupCopyService implements BackupCopyService { String jobID = job.getJobID().toString(); job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); - LOG.debug("DistCp job-id: " + jobID); + LOG.debug("DistCp job-id: " + jobID + " completed: " + job.isComplete() + " " + + job.isSuccessful()); + Counters ctrs = job.getCounters(); + LOG.debug(ctrs); + if (job.isComplete() && !job.isSuccessful()) { + throw new Exception("DistCp job-id: " + jobID + " failed"); + } return job; } @@ -292,21 +307,28 @@ public class MapReduceBackupCopyService implements BackupCopyService { LOG.debug("Doing COPY_TYPE_DISTCP"); setSubTaskPercntgInWholeTask(1f); - BackupDistCp distcp = new BackupDistCp(new Configuration(conf), null, context, + Configuration config = new Configuration(conf); + BackupDistCp distcp = new BackupDistCp(config, null, context, backupManager); // Handle a special case where the source file is a single file. // In this case, distcp will not create the target dir. It just take the // target as a file name and copy source file to the target (as a file name). // We need to create the target dir before run distcp. LOG.debug("DistCp options: " + Arrays.toString(options)); - if (options.length == 2) { - Path dest = new Path(options[1]); - FileSystem destfs = dest.getFileSystem(conf); - if (!destfs.exists(dest)) { - destfs.mkdirs(dest); - } + Path dest = new Path(options[options.length-1]); + FileSystem destfs = dest.getFileSystem(conf); + if (!destfs.exists(dest)) { + destfs.mkdirs(dest); } res = distcp.run(options); + + LOG.debug("list of " + dest + " for distcp " + res); + FileSystem fs = dest.getFileSystem(conf); + RemoteIterator iter = fs.listLocatedStatus(dest); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + LOG.debug(stat); + } } return res; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/IncrementalTableBackupProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/IncrementalTableBackupProcedure.java index 6e90309..05a63ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/IncrementalTableBackupProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/IncrementalTableBackupProcedure.java @@ -101,7 +101,7 @@ public class IncrementalTableBackupProcedure LOG.warn("Can't find file: " + file); } } - return list; + return list; } private List getMissingFiles(List incrBackupFileList) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java index 58cd4b2..b86c5e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/LogRollBackupSubprocedure.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.procedure.ProcedureMember; import org.apache.hadoop.hbase.procedure.Subprocedure; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.wal.FSHLog; @@ -75,7 +76,9 @@ public class LogRollBackupSubprocedure extends Subprocedure { long filenum = hlog.getFilenum(); LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum); - hlog.rollWriter(true); + ((HRegionServer)rss).walRoller.requestRollAll(); + // hlog.rollWriter(true); + Thread.sleep(19000); LOG.info("After roll log in backup subprocedure, current log number: " + hlog.getFilenum()); Connection connection = rss.getConnection(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 81e56b2..30712c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -368,7 +368,7 @@ public class HRegionServer extends HasThread implements // WAL roller. log is protected rather than private to avoid // eclipse warning when accessed by inner classes - final LogRoller walRoller; + public final LogRoller walRoller; // Lazily initialized if this RegionServer hosts a meta table. final AtomicReference metawalRoller = new AtomicReference(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 31f05c2..1c096dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -662,6 +662,7 @@ public class FSHLog implements WAL { private void preemptiveSync(final ProtobufLogWriter nextWriter) { long startTimeNanos = System.nanoTime(); try { + LOG.debug("syncing writer " + ((WriterBase)nextWriter).path); nextWriter.sync(); postSync(System.nanoTime() - startTimeNanos, 0); } catch (IOException e) { @@ -858,6 +859,7 @@ public class FSHLog implements WAL { try { if (this.writer != null) { Trace.addTimelineAnnotation("closing writer"); + LOG.debug("closing " + ((WriterBase)writer).path); this.writer.close(); Trace.addTimelineAnnotation("writer closed"); } @@ -1085,6 +1087,7 @@ public class FSHLog implements WAL { LOG.debug("Closing WAL writer in " + FSUtils.getPath(fullPathLogDir)); } if (this.writer != null) { + LOG.debug("closing " + ((WriterBase)writer).path); this.writer.close(); this.writer = null; } @@ -1271,6 +1274,7 @@ public class FSHLog implements WAL { Throwable lastException = null; try { Trace.addTimelineAnnotation("syncing writer"); + LOG.debug("syncing writer " + ((WriterBase)writer).path); writer.sync(); Trace.addTimelineAnnotation("writer synced"); currentSequence = updateHighestSyncedSequence(currentSequence); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java index 8188e02..2939655 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java @@ -39,10 +39,12 @@ public abstract class WriterBase implements DefaultWALProvider.Writer { protected CompressionContext compressionContext; protected Configuration conf; + protected Path path; @Override public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) throws IOException { this.conf = conf; + this.path = path; } public boolean initializeCompressionContext(Configuration conf, Path path) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java index 65c774e..9f8377e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedGroupingStrategy.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.wal.RegionGroupingProvider.RegionGroupingStrategy public class BoundedGroupingStrategy implements RegionGroupingStrategy{ static final String NUM_REGION_GROUPS = "hbase.wal.regiongrouping.numgroups"; - static final int DEFAULT_NUM_REGION_GROUPS = 2; + static final int DEFAULT_NUM_REGION_GROUPS = 4; private ConcurrentHashMap groupNameCache = new ConcurrentHashMap(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 08f42aa..8296294 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -74,7 +74,7 @@ public class WALFactory { * Maps between configuration names for providers and implementation classes. */ static enum Providers { - defaultProvider(DefaultWALProvider.class), + defaultProvider(RegionGroupingProvider.class), filesystem(DefaultWALProvider.class), multiwal(RegionGroupingProvider.class); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 492a0f2..78d8290 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -77,7 +77,7 @@ public class TestBackupBase { protected static TableName table3_restore = TableName.valueOf("ns3:table3_restore"); protected static TableName table4_restore = TableName.valueOf("ns4:table4_restore"); - protected static final int NB_ROWS_IN_BATCH = 999; + protected static final int NB_ROWS_IN_BATCH = 199; protected static final byte[] qualName = Bytes.toBytes("q1"); protected static final byte[] famName = Bytes.toBytes("f"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupLogCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupLogCleaner.java index 3ef68e6..e6bd675 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupLogCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupLogCleaner.java @@ -55,6 +55,13 @@ import com.google.common.collect.Lists; public class TestBackupLogCleaner extends TestBackupBase { private static final Log LOG = LogFactory.getLog(TestBackupLogCleaner.class); + void logFiles(List files, String subj) { + LOG.debug(subj + " WAL"); + for (FileStatus f : files) { + LOG.debug(f.getPath()); + } + } + // implements all test cases in 1 test since incremental full backup/ // incremental backup has dependencies @Test @@ -95,7 +102,9 @@ public class TestBackupLogCleaner extends TestBackupBase { // New list of wal files is greater than the previous one, // because new wal per RS have been opened after full backup - assertTrue(walFiles.size() < newWalFiles.size()); + logFiles(walFiles, "prev"); + logFiles(newWalFiles, "curr"); + assertTrue(walFiles.size() <= newWalFiles.size()); Connection conn = ConnectionFactory.createConnection(conf1); // #2 - insert some data to table HTable t1 = (HTable) conn.getTable(table1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java index 0f35026..59933ee 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java @@ -61,6 +61,7 @@ public class TestIncrementalBackup extends TestBackupBase { assertTrue(checkSucceeded(backupIdFull)); // #2 - insert some data to table + LOG.debug("writing " + NB_ROWS_IN_BATCH + " rows to " + table1); HTable t1 = (HTable) conn.getTable(table1); Put p1; for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { @@ -71,6 +72,7 @@ public class TestIncrementalBackup extends TestBackupBase { Assert.assertThat(TEST_UTIL.countRows(t1), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2)); t1.close(); + LOG.debug("written " + NB_ROWS_IN_BATCH + " rows to " + table1); HTable t2 = (HTable) conn.getTable(table2); Put p2; @@ -82,6 +84,7 @@ public class TestIncrementalBackup extends TestBackupBase { Assert.assertThat(TEST_UTIL.countRows(t2), CoreMatchers.equalTo(NB_ROWS_IN_BATCH + 5)); t2.close(); + LOG.debug("written " + NB_ROWS_IN_BATCH + " rows to " + table2); // #3 - incremental backup for multiple tables tables = Lists.newArrayList(table1, table2, table3); diff --git a/pom.xml b/pom.xml index b3fa787..4805ce2 100644 --- a/pom.xml +++ b/pom.xml @@ -3042,6 +3042,10 @@ + apache RC + https://repository.apache.org/content/repositories/orgapachehadoop-1040 + + project.local project file:${project.basedir}/src/main/site/resources/repo