diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java index 1b05aa9..ddf8baa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java @@ -282,96 +282,108 @@ public class RestoreServerUtil { FileSystem fileSys = tableBackupPath.getFileSystem(this.conf); - // get table descriptor first - HTableDescriptor tableDescriptor = null; - - Path tableSnapshotPath = getTableSnapshotPath(backupRootPath, tableName, backupId); - - if (fileSys.exists(tableSnapshotPath)) { - // snapshot path exist means the backup path is in HDFS - // check whether snapshot dir already recorded for target table - if (snapshotMap.get(tableName) != null) { - SnapshotDescription desc = - SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath); - SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc); - tableDescriptor = manifest.getTableDescriptor(); - } else { - tableDescriptor = getTableDesc(tableName); - snapshotMap.put(tableName, getTableInfoPath(tableName)); + try (Connection conn = ConnectionFactory.createConnection(conf); + HBaseAdmin hbadmin = (HBaseAdmin) conn.getAdmin();) { + // get table descriptor first + HTableDescriptor tableDescriptor = null; + try { + tableDescriptor = hbadmin.getTableDescriptor(tableName); + } catch (IOException ioe) { + LOG.debug("unable to obtain table descriptor for " + tableName, ioe); } + if (tableDescriptor == null) { - LOG.debug("Found no table descriptor in the snapshot dir, previous schema would be lost"); + Path tableSnapshotPath = getTableSnapshotPath(backupRootPath, tableName, backupId); + if (fileSys.exists(tableSnapshotPath)) { + // snapshot path exist means the backup path is in HDFS + // check whether snapshot dir already recorded for target table + if (snapshotMap.get(tableName) != null) { + SnapshotDescription desc = + SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath); + SnapshotManifest manifest = SnapshotManifest.open(conf,fileSys,tableSnapshotPath,desc); + tableDescriptor = manifest.getTableDescriptor(); + LOG.debug("obtained descriptor from " + manifest); + } else { + tableDescriptor = getTableDesc(tableName); + snapshotMap.put(tableName, getTableInfoPath(tableName)); + LOG.debug("obtained descriptor from snapshot for " + tableName); + } + if (tableDescriptor == null) { + LOG.debug("Found no table descriptor in the snapshot dir, previous schema was lost"); + } + } else if (converted) { + // first check if this is a converted backup image + LOG.error("convert will be supported in a future jira"); + } } - } else if (converted) { - // first check if this is a converted backup image - LOG.error("convert will be supported in a future jira"); - } - Path tableArchivePath = getTableArchivePath(tableName); - if (tableArchivePath == null) { - if (tableDescriptor != null) { - // find table descriptor but no archive dir means the table is empty, create table and exit - if(LOG.isDebugEnabled()) { - LOG.debug("find table descriptor but no archive dir for table " + tableName - + ", will only create table"); + Path tableArchivePath = getTableArchivePath(tableName); + if (tableArchivePath == null) { + if (tableDescriptor != null) { + // find table descriptor but no archive dir => the table is empty, create table and exit + if(LOG.isDebugEnabled()) { + LOG.debug("find table descriptor but no archive dir for table " + tableName + + ", will only create table"); + } + tableDescriptor.setName(newTableName); + checkAndCreateTable(hbadmin, tableBackupPath, tableName, newTableName, null, + tableDescriptor, truncateIfExists); + return; + } else { + throw new IllegalStateException("Cannot restore hbase table because directory '" + + " tableArchivePath is null."); } - tableDescriptor.setName(newTableName); - checkAndCreateTable(tableBackupPath, tableName, newTableName, null, - tableDescriptor, truncateIfExists); - return; - } else { - throw new IllegalStateException("Cannot restore hbase table because directory '" - + " tableArchivePath is null."); } - } - if (tableDescriptor == null) { - tableDescriptor = new HTableDescriptor(newTableName); - } else { - tableDescriptor.setName(newTableName); - } + if (tableDescriptor == null) { + LOG.debug("New descriptor for " + newTableName); + tableDescriptor = new HTableDescriptor(newTableName); + } else { + tableDescriptor.setName(newTableName); + } - if (!converted) { - // record all region dirs: - // load all files in dir - try { - ArrayList regionPathList = getRegionList(tableName); - - // should only try to create the table with all region informations, so we could pre-split - // the regions in fine grain - checkAndCreateTable(tableBackupPath, tableName, newTableName, regionPathList, - tableDescriptor, truncateIfExists); - if (tableArchivePath != null) { - // start real restore through bulkload - // if the backup target is on local cluster, special action needed - Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath); - if (tempTableArchivePath.equals(tableArchivePath)) { - if(LOG.isDebugEnabled()) { - LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath); - } - } else { - regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir - if(LOG.isDebugEnabled()) { - LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath); + if (!converted) { + // record all region dirs: + // load all files in dir + try { + ArrayList regionPathList = getRegionList(tableName); + + // should only try to create the table with all region informations, so we could pre-split + // the regions in fine grain + checkAndCreateTable(hbadmin, tableBackupPath, tableName, newTableName, regionPathList, + tableDescriptor, truncateIfExists); + if (tableArchivePath != null) { + // start real restore through bulkload + // if the backup target is on local cluster, special action needed + Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath); + if (tempTableArchivePath.equals(tableArchivePath)) { + if(LOG.isDebugEnabled()) { + LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath); + } + } else { + regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir + if(LOG.isDebugEnabled()) { + LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath); + } } - } - LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false); - for (Path regionPath : regionPathList) { - String regionName = regionPath.toString(); - if(LOG.isDebugEnabled()) { - LOG.debug("Restoring HFiles from directory " + regionName); + LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false); + for (Path regionPath : regionPathList) { + String regionName = regionPath.toString(); + if(LOG.isDebugEnabled()) { + LOG.debug("Restoring HFiles from directory " + regionName); + } + String[] args = { regionName, newTableName.getNameAsString() }; + loader.run(args); } - String[] args = { regionName, newTableName.getNameAsString()}; - loader.run(args); } + // we do not recovered edits + } catch (Exception e) { + throw new IllegalStateException("Cannot restore hbase table", e); } - // we do not recovered edits - } catch (Exception e) { - throw new IllegalStateException("Cannot restore hbase table", e); + } else { + LOG.debug("convert will be supported in a future jira"); } - } else { - LOG.debug("convert will be supported in a future jira"); } } @@ -471,6 +483,7 @@ public class RestoreServerUtil { // By default, it is 32 and loader will fail if # of files in any region exceed this // limit. Bad for snapshot restore. this.conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); + this.conf.set(LoadIncrementalHFiles.SILENCE_CONF_KEY, "yes"); LoadIncrementalHFiles loader = null; try { loader = new LoadIncrementalHFiles(this.conf); @@ -570,15 +583,11 @@ public class RestoreServerUtil { * @param htd table descriptor * @throws IOException exception */ - private void checkAndCreateTable(Path tableBackupPath, TableName tableName, + private void checkAndCreateTable(HBaseAdmin hbadmin, Path tableBackupPath, TableName tableName, TableName targetTableName, ArrayList regionDirList, HTableDescriptor htd, boolean truncateIfExists) throws IOException { - HBaseAdmin hbadmin = null; - Connection conn = null; try { - conn = ConnectionFactory.createConnection(conf); - hbadmin = (HBaseAdmin) conn.getAdmin(); boolean createNew = false; if (hbadmin.tableExists(targetTableName)) { if(truncateIfExists) { @@ -592,7 +601,7 @@ public class RestoreServerUtil { } else { createNew = true; } - if(createNew){ + if (createNew){ LOG.info("Creating target table '" + targetTableName + "'"); // if no region directory given, create the table and return if (regionDirList == null || regionDirList.size() == 0) { @@ -605,13 +614,6 @@ public class RestoreServerUtil { } } catch (Exception e) { throw new IOException(e); - } finally { - if (hbadmin != null) { - hbadmin.close(); - } - if(conn != null){ - conn.close(); - } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 5d75d56..0aefa3e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -117,9 +117,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool { = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; public final static String CREATE_TABLE_CONF_KEY = "create.table"; + public final static String SILENCE_CONF_KEY = "silence"; private int maxFilesPerRegionPerFamily; private boolean assignSeqIds; + private List unmatchedFamilies = new ArrayList(); // Source filesystem private FileSystem fs; @@ -157,7 +159,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private void usage() { System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" - + " Note: if you set this to 'no', then the target table must already exist in HBase\n" + + " Note: if you set this to 'no', then the target table must already exist in HBase\n -D" + + SILENCE_CONF_KEY + "=yes - can be used to ignore unmatched column families\n" + "\n"); } @@ -305,7 +308,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throws TableNotFoundException, IOException { try (Admin admin = table.getConnection().getAdmin(); RegionLocator rl = table.getRegionLocator()) { - doBulkLoad(hfofDir, admin, table, rl); + doBulkLoad(hfofDir, admin, table, rl, false); } } @@ -315,11 +318,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * * @param hfofDir the directory that was provided as the output path * of a job using HFileOutputFormat + * @param admin the Admin * @param table the table to load into + * @param regionLocator region locator + * @param silence true to ignore unmatched column families * @throws TableNotFoundException if table does not yet exist */ public void doBulkLoad(Path hfofDir, final Admin admin, Table table, - RegionLocator regionLocator) throws TableNotFoundException, IOException { + RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException { if (!admin.isTableAvailable(regionLocator.getName())) { throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); @@ -342,7 +348,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { "option, consider removing the files and bulkload again without this option. " + "See HBASE-13985"); } - prepareHFileQueue(hfofDir, table, queue, validateHFile); + prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); int count = 0; @@ -427,12 +433,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @param table table to which hfiles should be loaded * @param queue queue which needs to be loaded into the table * @param validateHFile if true hfiles will be validated for its format + * @param silence true to ignore unmatched column families * @throws IOException If any I/O or network error occurred */ public void prepareHFileQueue(Path hfilesDir, Table table, Deque queue, - boolean validateHFile) throws IOException { + boolean validateHFile, boolean silence) throws IOException { discoverLoadQueue(queue, hfilesDir, validateHFile); - validateFamiliesInHFiles(table, queue); + validateFamiliesInHFiles(table, queue, silence); } // Initialize a thread pool @@ -448,14 +455,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool { /** * Checks whether there is any invalid family name in HFiles to be bulk loaded. */ - private void validateFamiliesInHFiles(Table table, Deque queue) + private void validateFamiliesInHFiles(Table table, Deque queue, boolean silence) throws IOException { Collection families = table.getTableDescriptor().getFamilies(); List familyNames = new ArrayList(families.size()); for (HColumnDescriptor family : families) { familyNames.add(family.getNameAsString()); } - List unmatchedFamilies = new ArrayList(); Iterator queueIter = queue.iterator(); while (queueIter.hasNext()) { LoadQueueItem lqi = queueIter.next(); @@ -470,7 +476,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " + familyNames; LOG.error(msg); - throw new IOException(msg); + if (!silence) throw new IOException(msg); } } @@ -774,7 +780,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool { final List> famPaths = new ArrayList>(lqis.size()); for (LoadQueueItem lqi : lqis) { - famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); + if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) { + famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); + } } final RegionServerCallable svrCallable = @@ -1028,7 +1036,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { @Override public int run(String[] args) throws Exception { - if (args.length != 2) { + if (args.length < 2) { usage(); return -1; } @@ -1054,7 +1062,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try (Table table = connection.getTable(tableName); RegionLocator locator = connection.getRegionLocator(tableName)) { - doBulkLoad(hfofDir, admin, table, locator); + boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, "")); + doBulkLoad(hfofDir, admin, table, locator, silence); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index 3982b1d..0db96be 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -70,6 +70,7 @@ public class TestBackupBase { protected static HBaseTestingUtility TEST_UTIL; protected static HBaseTestingUtility TEST_UTIL2; protected static TableName table1 = TableName.valueOf("table1"); + protected static HTableDescriptor table1Desc; protected static TableName table2 = TableName.valueOf("table2"); protected static TableName table3 = TableName.valueOf("table3"); protected static TableName table4 = TableName.valueOf("table4"); @@ -209,6 +210,7 @@ public class TestBackupBase { HColumnDescriptor fam = new HColumnDescriptor(famName); desc.addFamily(fam); ha.createTable(desc); + table1Desc = desc; Connection conn = ConnectionFactory.createConnection(conf1); HTable table = (HTable) conn.getTable(table1); loadTable(table); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java index 9a2ad89..a71902b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackup.java @@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.backup; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.BackupAdmin; import org.apache.hadoop.hbase.client.Connection; @@ -59,6 +61,17 @@ public class TestIncrementalBackup extends TestBackupBase { public TestIncrementalBackup(Boolean b) { } + HTable insertIntoTable(Connection conn, byte[] family, int numRows) throws IOException { + HTable t1 = (HTable) conn.getTable(table1); + Put p1; + for (int i = 0; i < numRows; i++) { + p1 = new Put(Bytes.toBytes("row-t1" + i)); + p1.addColumn(family, qualName, Bytes.toBytes("val" + i)); + t1.put(p1); + } + return t1; + } + //implement all test cases in 1 test since incremental backup/restore has dependencies @Test public void TestIncBackupRestore() throws Exception { @@ -66,8 +79,14 @@ public class TestIncrementalBackup extends TestBackupBase { LOG.info("create full backup image for all tables"); List tables = Lists.newArrayList(table1, table2); - HBaseAdmin admin = null; + final byte[] fam3Name = Bytes.toBytes("f3"); + table1Desc.addFamily(new HColumnDescriptor(fam3Name)); + TEST_UTIL.modifyTableSync(TEST_UTIL.getAdmin(), table1Desc); + Connection conn = ConnectionFactory.createConnection(conf1); + insertIntoTable(conn, fam3Name, 6).close(); + + HBaseAdmin admin = null; admin = (HBaseAdmin) conn.getAdmin(); BackupRequest request = new BackupRequest(); @@ -77,14 +96,8 @@ public class TestIncrementalBackup extends TestBackupBase { assertTrue(checkSucceeded(backupIdFull)); // #2 - insert some data to table + HTable t1 = insertIntoTable(conn, famName, NB_ROWS_IN_BATCH); LOG.debug("writing " + NB_ROWS_IN_BATCH + " rows to " + table1); - HTable t1 = (HTable) conn.getTable(table1); - Put p1; - for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { - p1 = new Put(Bytes.toBytes("row-t1" + i)); - p1.addColumn(famName, qualName, Bytes.toBytes("val" + i)); - t1.put(p1); - } Assert.assertThat(TEST_UTIL.countRows(t1), CoreMatchers.equalTo(NB_ROWS_IN_BATCH * 2)); t1.close(); @@ -102,6 +115,15 @@ public class TestIncrementalBackup extends TestBackupBase { t2.close(); LOG.debug("written " + 5 + " rows to " + table2); + // add column family to table1 + final byte[] fam2Name = Bytes.toBytes("f2"); + table1Desc.addFamily(new HColumnDescriptor(fam2Name)); + table1Desc.removeFamily(fam3Name); + TEST_UTIL.modifyTableSync(TEST_UTIL.getAdmin(), table1Desc); + + HTable t3 = insertIntoTable(conn, fam2Name, 6); + t3.close(); + // #3 - incremental backup for multiple tables tables = Lists.newArrayList(table1, table2); request = new BackupRequest();