diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java index 25ec9d9..7644a4d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/BackupRestoreServerFactory.java @@ -36,15 +36,15 @@ public final class BackupRestoreServerFactory { } /** - * Gets incremental restore service + * Gets backup restore service * @param conf - configuration - * @return incremental backup service instance + * @return backup restore service instance */ - public static IncrementalRestoreService getIncrementalRestoreService(Configuration conf) { - Class cls = + public static RestoreService getRestoreService(Configuration conf) { + Class cls = conf.getClass(HBASE_INCR_RESTORE_IMPL_CLASS, MapReduceRestoreService.class, - IncrementalRestoreService.class); - IncrementalRestoreService service = ReflectionUtils.newInstance(cls, conf); + RestoreService.class); + RestoreService service = ReflectionUtils.newInstance(cls, conf); service.setConf(conf); return service; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java index 1fc0a92..a130a9b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HBackupFileSystem.java @@ -20,18 +20,25 @@ package org.apache.hadoop.hbase.backup; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; /** * View to an on-disk Backup Image FileSytem @@ -77,6 +84,27 @@ public class HBackupFileSystem { return new Path(getTableBackupDir(backupRootPath.toString(), backupId, tableName)); } + + public static List loadRegionInfos(TableName tableName, + Path backupRootPath, String backupId, Configuration conf) throws IOException + { + Path backupTableRoot = getTableBackupPath(tableName, backupRootPath, backupId); + FileSystem fs = backupTableRoot.getFileSystem(conf); + RemoteIterator it = fs.listFiles(backupTableRoot, true); + List infos = new ArrayList(); + while(it.hasNext()) { + LocatedFileStatus lfs = it.next(); + if(lfs.isFile() && lfs.getPath().toString().endsWith(HRegionFileSystem.REGION_INFO_FILE)) { + Path regionDir = lfs.getPath().getParent(); + HRegionInfo info = HRegionFileSystem.loadRegionInfoFileContent(fs, regionDir); + infos.add(info); + } + } + + Collections.sort(infos); + return infos; + } + /** * Given the backup root dir and the backup id, return the log file location for an incremental * backup. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java deleted file mode 100644 index ae48480..0000000 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/IncrementalRestoreService.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -@InterfaceAudience.Private -@InterfaceStability.Evolving -public interface IncrementalRestoreService extends Configurable{ - - /** - * Run restore operation - * @param logDirectoryPaths - path array of WAL log directories - * @param fromTables - from tables - * @param toTables - to tables - * @throws IOException - */ - public void run(Path[] logDirectoryPaths, TableName[] fromTables, TableName[] toTables) - throws IOException; -} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java new file mode 100644 index 0000000..2da98c2 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/RestoreService.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving + +/** + * Backup restore service interface + * Concrete implementation is provided by backup provider. + */ + +public interface RestoreService extends Configurable{ + + /** + * Run restore operation + * @param dirPaths - path array of WAL log directories + * @param fromTables - from tables + * @param toTables - to tables + * @param fullBackupRestore - full backup restore + * @throws IOException + */ + public void run(Path[] dirPaths, TableName[] fromTables, + TableName[] toTables, boolean fullBackupRestore) + throws IOException; +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesProcedure.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesProcedure.java deleted file mode 100644 index 7ac11de..0000000 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesProcedure.java +++ /dev/null @@ -1,402 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.backup.impl; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupType; -import org.apache.hadoop.hbase.backup.HBackupFileSystem; -import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; -import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.TableStateManager; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface; -import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreTablesState; -import org.apache.hadoop.security.UserGroupInformation; - -@InterfaceAudience.Private -public class RestoreTablesProcedure - extends StateMachineProcedure - implements TableProcedureInterface { - private static final Log LOG = LogFactory.getLog(RestoreTablesProcedure.class); - - private final AtomicBoolean aborted = new AtomicBoolean(false); - private Configuration conf; - private String backupId; - private List sTableList; - private List tTableList; - private String targetRootDir; - private boolean isOverwrite; - - public RestoreTablesProcedure() { - // Required by the Procedure framework to create the procedure on replay - } - - public RestoreTablesProcedure(final MasterProcedureEnv env, - final String targetRootDir, String backupId, List sTableList, - List tTableList, boolean isOverwrite) throws IOException { - this.targetRootDir = targetRootDir; - this.backupId = backupId; - this.sTableList = sTableList; - this.tTableList = tTableList; - if (tTableList == null || tTableList.isEmpty()) { - this.tTableList = sTableList; - } - this.isOverwrite = isOverwrite; - this.setOwner(env.getRequestUser().getUGI().getShortUserName()); - } - - @Override - public byte[] getResult() { - return null; - } - - /** - * Validate target Tables - * @param conn connection - * @param mgr table state manager - * @param tTableArray: target tables - * @param isOverwrite overwrite existing table - * @throws IOException exception - */ - private void checkTargetTables(Connection conn, TableStateManager mgr, TableName[] tTableArray, - boolean isOverwrite) - throws IOException { - ArrayList existTableList = new ArrayList<>(); - ArrayList disabledTableList = new ArrayList<>(); - - // check if the tables already exist - for (TableName tableName : tTableArray) { - if (MetaTableAccessor.tableExists(conn, tableName)) { - existTableList.add(tableName); - if (mgr.isTableState(tableName, TableState.State.DISABLED, TableState.State.DISABLING)) { - disabledTableList.add(tableName); - } - } else { - LOG.info("HBase table " + tableName - + " does not exist. It will be created during restore process"); - } - } - - if (existTableList.size() > 0) { - if (!isOverwrite) { - LOG.error("Existing table (" + existTableList + ") found in the restore target, please add " - + "\"-overwrite\" option in the command if you mean to restore to these existing tables"); - throw new IOException("Existing table found in target while no \"-overwrite\" " - + "option found"); - } else { - if (disabledTableList.size() > 0) { - LOG.error("Found offline table in the restore target, " - + "please enable them before restore with \"-overwrite\" option"); - LOG.info("Offline table list in restore target: " + disabledTableList); - throw new IOException( - "Found offline table in the target when restore with \"-overwrite\" option"); - } - } - } - } - - /** - * Restore operation handle each backupImage in iterator - * @param conn the Connection - * @param it: backupImage iterator - ascending - * @param sTable: table to be restored - * @param tTable: table to be restored to - * @param truncateIfExists truncate table if it exists - * @throws IOException exception - */ - private void restoreImages(MasterServices svc, Iterator it, TableName sTable, - TableName tTable, boolean truncateIfExists) throws IOException { - - // First image MUST be image of a FULL backup - BackupImage image = it.next(); - - String rootDir = image.getRootDir(); - String backupId = image.getBackupId(); - Path backupRoot = new Path(rootDir); - - // We need hFS only for full restore (see the code) - RestoreServerUtil restoreTool = new RestoreServerUtil(conf, backupRoot, backupId); - BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId); - - Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId); - - // TODO: convert feature will be provided in a future JIRA - boolean converted = false; - String lastIncrBackupId = null; - List logDirList = null; - - // Scan incremental backups - if (it.hasNext()) { - // obtain the backupId for most recent incremental - logDirList = new ArrayList(); - while (it.hasNext()) { - BackupImage im = it.next(); - String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId()); - logDirList.add(logBackupDir); - lastIncrBackupId = im.getBackupId(); - } - } - if (manifest.getType() == BackupType.FULL || converted) { - LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from " - + (converted ? "converted" : "full") + " backup image " + tableBackupPath.toString()); - restoreTool.fullRestoreTable(svc, tableBackupPath, sTable, tTable, - converted, truncateIfExists, lastIncrBackupId); - } else { // incremental Backup - throw new IOException("Unexpected backup type " + image.getType()); - } - - // The rest one are incremental - if (logDirList != null) { - String logDirs = StringUtils.join(logDirList, ","); - LOG.info("Restoring '" + sTable + "' to '" + tTable - + "' from log dirs: " + logDirs); - String[] sarr = new String[logDirList.size()]; - logDirList.toArray(sarr); - Path[] paths = org.apache.hadoop.util.StringUtils.stringToPath(sarr); - restoreTool.incrementalRestoreTable(svc, tableBackupPath, paths, - new TableName[] { sTable }, new TableName[] { tTable }, lastIncrBackupId); - } - LOG.info(sTable + " has been successfully restored to " + tTable); - } - - /** - * Restore operation. Stage 2: resolved Backup Image dependency - * @param svc MasterServices - * @param backupManifestMap : tableName, Manifest - * @param sTableArray The array of tables to be restored - * @param tTableArray The array of mapping tables to restore to - * @param isOverwrite overwrite - * @return set of BackupImages restored - * @throws IOException exception - */ - private void restoreStage(MasterServices svc, HashMap backupManifestMap, - TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException { - TreeSet restoreImageSet = new TreeSet(); - boolean truncateIfExists = isOverwrite; - try { - for (int i = 0; i < sTableArray.length; i++) { - TableName table = sTableArray[i]; - BackupManifest manifest = backupManifestMap.get(table); - // Get the image list of this backup for restore in time order from old - // to new. - List list = new ArrayList(); - list.add(manifest.getBackupImage()); - List depList = manifest.getDependentListByTable(table); - list.addAll(depList); - TreeSet restoreList = new TreeSet(list); - LOG.debug("need to clear merged Image. to be implemented in future jira"); - restoreImages(svc, restoreList.iterator(), table, tTableArray[i], truncateIfExists); - restoreImageSet.addAll(restoreList); - - if (restoreImageSet != null && !restoreImageSet.isEmpty()) { - LOG.info("Restore includes the following image(s):"); - for (BackupImage image : restoreImageSet) { - LOG.info("Backup: " - + image.getBackupId() - + " " - + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), - table)); - } - } - } - } catch (Exception e) { - LOG.error("Failed", e); - throw new IOException(e); - } - LOG.debug("restoreStage finished"); - } - - @Override - protected Flow executeFromState(final MasterProcedureEnv env, final RestoreTablesState state) - throws InterruptedException { - if (conf == null) { - conf = env.getMasterConfiguration(); - } - if (LOG.isTraceEnabled()) { - LOG.trace(this + " execute state=" + state); - } - TableName[] tTableArray = tTableList.toArray(new TableName[tTableList.size()]); - try { - switch (state) { - case VALIDATION: - - // check the target tables - checkTargetTables(env.getMasterServices().getConnection(), - env.getMasterServices().getTableStateManager(), tTableArray, isOverwrite); - - setNextState(RestoreTablesState.RESTORE_IMAGES); - break; - case RESTORE_IMAGES: - TableName[] sTableArray = sTableList.toArray(new TableName[sTableList.size()]); - HashMap backupManifestMap = new HashMap<>(); - // check and load backup image manifest for the tables - Path rootPath = new Path(targetRootDir); - HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath, - backupId); - restoreStage(env.getMasterServices(), backupManifestMap, sTableArray, - tTableArray, isOverwrite); - - return Flow.NO_MORE_STATE; - - default: - throw new UnsupportedOperationException("unhandled state=" + state); - } - } catch (IOException e) { - setFailure("restore-table", e); - } - return Flow.HAS_MORE_STATE; - } - - @Override - protected void rollbackState(final MasterProcedureEnv env, final RestoreTablesState state) - throws IOException { - } - - @Override - protected RestoreTablesState getState(final int stateId) { - return RestoreTablesState.valueOf(stateId); - } - - @Override - protected int getStateId(final RestoreTablesState state) { - return state.getNumber(); - } - - @Override - protected RestoreTablesState getInitialState() { - return RestoreTablesState.VALIDATION; - } - - @Override - protected void setNextState(final RestoreTablesState state) { - if (aborted.get()) { - setAbortFailure("snapshot-table", "abort requested"); - } else { - super.setNextState(state); - } - } - - @Override - public boolean abort(final MasterProcedureEnv env) { - aborted.set(true); - return true; - } - - @Override - public void toStringClassDetails(StringBuilder sb) { - sb.append(getClass().getSimpleName()); - sb.append(" (targetRootDir="); - sb.append(targetRootDir); - sb.append(" isOverwrite= "); - sb.append(isOverwrite); - sb.append(" backupId= "); - sb.append(backupId); - sb.append(")"); - } - - MasterProtos.RestoreTablesRequest toRestoreTables() { - MasterProtos.RestoreTablesRequest.Builder bldr = MasterProtos.RestoreTablesRequest.newBuilder(); - bldr.setOverwrite(isOverwrite).setBackupId(backupId); - bldr.setBackupRootDir(targetRootDir); - for (TableName table : sTableList) { - bldr.addTables(ProtobufUtil.toProtoTableName(table)); - } - for (TableName table : tTableList) { - bldr.addTargetTables(ProtobufUtil.toProtoTableName(table)); - } - return bldr.build(); - } - - @Override - public void serializeStateData(final OutputStream stream) throws IOException { - super.serializeStateData(stream); - - MasterProtos.RestoreTablesRequest restoreTables = toRestoreTables(); - restoreTables.writeDelimitedTo(stream); - } - - @Override - public void deserializeStateData(final InputStream stream) throws IOException { - super.deserializeStateData(stream); - - MasterProtos.RestoreTablesRequest proto = - MasterProtos.RestoreTablesRequest.parseDelimitedFrom(stream); - backupId = proto.getBackupId(); - targetRootDir = proto.getBackupRootDir(); - isOverwrite = proto.getOverwrite(); - sTableList = new ArrayList<>(proto.getTablesList().size()); - for (HBaseProtos.TableName table : proto.getTablesList()) { - sTableList.add(ProtobufUtil.toTableName(table)); - } - tTableList = new ArrayList<>(proto.getTargetTablesList().size()); - for (HBaseProtos.TableName table : proto.getTargetTablesList()) { - tTableList.add(ProtobufUtil.toTableName(table)); - } - } - - @Override - public TableName getTableName() { - return TableName.BACKUP_TABLE_NAME; - } - - @Override - public TableOperationType getTableOperationType() { - return TableOperationType.RESTORE; - } - - @Override - protected boolean acquireLock(final MasterProcedureEnv env) { - if (env.waitInitialized(this)) { - return false; - } - return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME); - } - - @Override - protected void releaseLock(final MasterProcedureEnv env) { - env.getProcedureQueue().releaseTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME); - } -} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java new file mode 100644 index 0000000..c69a335 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/HFileSplitter.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup.mapreduce; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileInputFormat2; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; +import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * A tool to split HFiles into new region boundaries as a M/R job. + * The tool generates HFiles for later bulk importing, + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class HFileSplitter extends Configured implements Tool { + private static final Log LOG = LogFactory.getLog(HFileSplitter.class); + final static String NAME = "HFileSplitter"; + public final static String BULK_OUTPUT_CONF_KEY = "hfile.bulk.output"; + public final static String TABLES_KEY = "hfile.input.tables"; + public final static String TABLE_MAP_KEY = "hfile.input.tablesmap"; + private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name"; + + public HFileSplitter(){ + } + + protected HFileSplitter(final Configuration c) { + super(c); + } + + /** + * A mapper that just writes out cells. + * This one can be used together with {@link KeyValueSortReducer} + */ + static class HFileCellMapper + extends Mapper { + + @Override + public void map(NullWritable key, KeyValue value, Context context) throws IOException, + InterruptedException { + // Convert value to KeyValue if subclass + if (!value.getClass().equals(KeyValue.class)) { + value = + new KeyValue(value.getRowArray(), value.getRowOffset(), (int) value.getRowLength(), + value.getFamilyArray(), value.getFamilyOffset(), (int) value.getFamilyLength(), + value.getQualifierArray(), value.getQualifierOffset(), + (int) value.getQualifierLength(), value.getTimestamp(), Type.codeToType(value + .getTypeByte()), value.getValueArray(), value.getValueOffset(), + value.getValueLength()); + } + context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)), value); + } + + @Override + public void setup(Context context) throws IOException { + // do nothing + } + } + + + + /** + * Sets up the actual job. + * + * @param args The command line parameters. + * @return The newly created job. + * @throws IOException When setting up the job fails. + */ + public Job createSubmittableJob(String[] args) throws IOException { + Configuration conf = getConf(); + String inputDirs = args[0]; + String tabName = args[1]; + conf.setStrings(TABLES_KEY, tabName); + Job job = + Job.getInstance(conf, + conf.get(JOB_NAME_CONF_KEY, NAME + "_" + EnvironmentEdgeManager.currentTime())); + job.setJarByClass(HFileSplitter.class); + FileInputFormat.addInputPaths(job, inputDirs); + job.setInputFormatClass(HFileInputFormat2.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + if (hfileOutPath != null) { + LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs); + TableName tableName = TableName.valueOf(tabName); + job.setMapperClass(HFileCellMapper.class); + job.setReducerClass(KeyValueSortReducer.class); + Path outputDir = new Path(hfileOutPath); + FileOutputFormat.setOutputPath(job, outputDir); + job.setMapOutputValueClass(KeyValue.class); + try (Connection conn = ConnectionFactory.createConnection(conf); + Table table = conn.getTable(tableName); + RegionLocator regionLocator = conn.getRegionLocator(tableName)) { + HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); + } + LOG.debug("success configuring load incremental job"); + + TableMapReduceUtil.addDependencyJars(job.getConfiguration(), + com.google.common.base.Preconditions.class); + } else { + throw new IOException("No bulk output directory specified"); + } + return job; + } + + + /** + * Print usage + * @param errorMsg Error message. Can be null. + */ + private void usage(final String errorMsg) { + if (errorMsg != null && errorMsg.length() > 0) { + System.err.println("ERROR: " + errorMsg); + } + System.err.println("Usage: " + NAME + " [options] "); + System.err.println("Read all HFile's for
and split them to
region boundaries."); + System.err.println("
table to load.\n"); + System.err.println("To generate HFiles for a bulk data load, pass the option:"); + System.err.println(" -D" + BULK_OUTPUT_CONF_KEY + "=/path/for/output"); + System.err.println("Other options:"); + System.err.println(" -D " + JOB_NAME_CONF_KEY + + "=jobName - use the specified mapreduce job name for the HFile splitter"); + System.err.println("For performance also consider the following options:\n" + + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false"); + } + + /** + * Main entry point. + * + * @param args The command line parameters. + * @throws Exception When running the job fails. + */ + public static void main(String[] args) throws Exception { + int ret = ToolRunner.run(new HFileSplitter(HBaseConfiguration.create()), args); + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 2) { + usage("Wrong number of arguments: " + args.length); + System.exit(-1); + } + Job job = createSubmittableJob(args); + int result =job.waitForCompletion(true) ? 0 : 1; + return result; + } +} + diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java index c47d6ed..d43db41 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreService.java @@ -1,19 +1,12 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. */ package org.apache.hadoop.hbase.backup.mapreduce; @@ -24,54 +17,67 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.IncrementalRestoreService; +import org.apache.hadoop.hbase.backup.RestoreService; import org.apache.hadoop.hbase.backup.util.BackupServerUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.mapreduce.WALPlayer; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.Tool; @InterfaceAudience.Private @InterfaceStability.Evolving -public class MapReduceRestoreService implements IncrementalRestoreService { +public class MapReduceRestoreService implements RestoreService { public static final Log LOG = LogFactory.getLog(MapReduceRestoreService.class); - private WALPlayer player; + private Tool player; + private Configuration conf; public MapReduceRestoreService() { - this.player = new WALPlayer(); } @Override - public void run(Path[] logDirPaths, TableName[] tableNames, TableName[] newTableNames) - throws IOException { + public void run(Path[] dirPaths, TableName[] tableNames, TableName[] newTableNames, + boolean fullBackupRestore) throws IOException { - // WALPlayer reads all files in arbitrary directory structure and creates a Map task for each - // log file - String logDirs = StringUtils.join(logDirPaths, ","); - LOG.info("Restore incremental backup from directory " + logDirs + " from hbase tables " - + BackupServerUtil.join(tableNames) + " to tables " + BackupServerUtil.join(newTableNames)); + String bulkOutputConfKey; + + if (fullBackupRestore) { + player = new HFileSplitter(); + bulkOutputConfKey = HFileSplitter.BULK_OUTPUT_CONF_KEY; + } else { + player = new WALPlayer(); + bulkOutputConfKey = WALPlayer.BULK_OUTPUT_CONF_KEY; + } + // Player reads all files in arbitrary directory structure and creates + // a Map task for each file + String dirs = StringUtils.join(dirPaths, ","); + + if (LOG.isDebugEnabled()) { + LOG.debug("Restore " + (fullBackupRestore ? "full" : "incremental") + + " backup from directory " + dirs + " from hbase tables " + + BackupServerUtil.join(tableNames) + " to tables " + + BackupServerUtil.join(newTableNames)); + } for (int i = 0; i < tableNames.length; i++) { - - LOG.info("Restore "+ tableNames[i] + " into "+ newTableNames[i]); - + + LOG.info("Restore " + tableNames[i] + " into " + newTableNames[i]); + Path bulkOutputPath = getBulkOutputDir(getFileNameCompatibleString(newTableNames[i])); - String[] playerArgs = - { logDirs, tableNames[i].getNameAsString() }; + Configuration conf = getConf(); + conf.set(bulkOutputConfKey, bulkOutputPath.toString()); + String[] playerArgs = { dirs, tableNames[i].getNameAsString() }; int result = 0; int loaderResult = 0; try { - Configuration conf = getConf(); - conf.set(WALPlayer.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString()); - player.setConf(getConf()); + + player.setConf(getConf()); result = player.run(playerArgs); if (succeeded(result)) { // do bulk load @@ -81,38 +87,37 @@ public class MapReduceRestoreService implements IncrementalRestoreService { } String[] args = { bulkOutputPath.toString(), newTableNames[i].getNameAsString() }; loaderResult = loader.run(args); - if(failed(loaderResult)) { - throw new IOException("Can not restore from backup directory " + logDirs - + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult); + + if (failed(loaderResult)) { + throw new IOException("Can not restore from backup directory " + dirs + + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult); } } else { - throw new IOException("Can not restore from backup directory " + logDirs - + " (check Hadoop/MR and HBase logs). WALPlayer return code =" + result); + throw new IOException("Can not restore from backup directory " + dirs + + " (check Hadoop/MR and HBase logs). Player return code =" + result); } LOG.debug("Restore Job finished:" + result); } catch (Exception e) { - throw new IOException("Can not restore from backup directory " + logDirs + throw new IOException("Can not restore from backup directory " + dirs + " (check Hadoop and HBase logs) ", e); } } } - private String getFileNameCompatibleString(TableName table) - { - return table.getNamespaceAsString() +"-"+ table.getQualifierAsString(); + private String getFileNameCompatibleString(TableName table) { + return table.getNamespaceAsString() + "-" + table.getQualifierAsString(); } - + private boolean failed(int result) { return result != 0; } - + private boolean succeeded(int result) { return result == 0; } - private LoadIncrementalHFiles createLoader() - throws IOException { + private LoadIncrementalHFiles createLoader() throws IOException { // set configuration for restore: // LoadIncrementalHFile needs more time // hbase.rpc.timeout600000 @@ -120,10 +125,11 @@ public class MapReduceRestoreService implements IncrementalRestoreService { Integer milliSecInHour = 3600000; Configuration conf = new Configuration(getConf()); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, milliSecInHour); - + // By default, it is 32 and loader will fail if # of files in any region exceed this // limit. Bad for snapshot restore. conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE); + conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes"); LoadIncrementalHFiles loader = null; try { loader = new LoadIncrementalHFiles(conf); @@ -133,27 +139,26 @@ public class MapReduceRestoreService implements IncrementalRestoreService { return loader; } - private Path getBulkOutputDir(String tableName) throws IOException - { + private Path getBulkOutputDir(String tableName) throws IOException { Configuration conf = getConf(); FileSystem fs = FileSystem.get(conf); - String tmp = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, - HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); - Path path = new Path(tmp + Path.SEPARATOR + "bulk_output-"+tableName + "-" - + EnvironmentEdgeManager.currentTime()); + String tmp = + conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); + Path path = + new Path(tmp + Path.SEPARATOR + "bulk_output-" + tableName + "-" + + EnvironmentEdgeManager.currentTime()); fs.deleteOnExit(path); return path; } - @Override public Configuration getConf() { - return player.getConf(); + return conf; } @Override public void setConf(Configuration conf) { - this.player.setConf(conf); + this.conf = conf; } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java index c56aaf3..94e991f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/FullTableBackupProcedure.java @@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescriptio import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.security.UserGroupInformation; @InterfaceAudience.Private public class FullTableBackupProcedure diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java new file mode 100644 index 0000000..2678278 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/master/RestoreTablesProcedure.java @@ -0,0 +1,387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.backup.master; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.backup.BackupType; +import org.apache.hadoop.hbase.backup.HBackupFileSystem; +import org.apache.hadoop.hbase.backup.impl.BackupManifest; +import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.TableStateManager; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreTablesState; + +@InterfaceAudience.Private +public class RestoreTablesProcedure + extends StateMachineProcedure + implements TableProcedureInterface { + private static final Log LOG = LogFactory.getLog(RestoreTablesProcedure.class); + + private final AtomicBoolean aborted = new AtomicBoolean(false); + private Configuration conf; + private String backupId; + private List sTableList; + private List tTableList; + private String targetRootDir; + private boolean isOverwrite; + + public RestoreTablesProcedure() { + // Required by the Procedure framework to create the procedure on replay + } + + public RestoreTablesProcedure(final MasterProcedureEnv env, + final String targetRootDir, String backupId, List sTableList, + List tTableList, boolean isOverwrite) throws IOException { + this.targetRootDir = targetRootDir; + this.backupId = backupId; + this.sTableList = sTableList; + this.tTableList = tTableList; + if (tTableList == null || tTableList.isEmpty()) { + this.tTableList = sTableList; + } + this.isOverwrite = isOverwrite; + this.setOwner(env.getRequestUser().getUGI().getShortUserName()); + } + + @Override + public byte[] getResult() { + return null; + } + + /** + * Validate target Tables + * @param conn connection + * @param mgr table state manager + * @param tTableArray: target tables + * @param isOverwrite overwrite existing table + * @throws IOException exception + */ + private void checkTargetTables(Connection conn, TableStateManager mgr, TableName[] tTableArray, + boolean isOverwrite) + throws IOException { + ArrayList existTableList = new ArrayList<>(); + ArrayList disabledTableList = new ArrayList<>(); + + // check if the tables already exist + for (TableName tableName : tTableArray) { + if (MetaTableAccessor.tableExists(conn, tableName)) { + existTableList.add(tableName); + if (mgr.isTableState(tableName, TableState.State.DISABLED, TableState.State.DISABLING)) { + disabledTableList.add(tableName); + } + } else { + LOG.info("HBase table " + tableName + + " does not exist. It will be created during restore process"); + } + } + + if (existTableList.size() > 0) { + if (!isOverwrite) { + LOG.error("Existing table (" + existTableList + ") found in the restore target, please add " + + "\"-overwrite\" option in the command if you mean to restore to these existing tables"); + throw new IOException("Existing table found in target while no \"-overwrite\" " + + "option found"); + } else { + if (disabledTableList.size() > 0) { + LOG.error("Found offline table in the restore target, " + + "please enable them before restore with \"-overwrite\" option"); + LOG.info("Offline table list in restore target: " + disabledTableList); + throw new IOException( + "Found offline table in the target when restore with \"-overwrite\" option"); + } + } + } + } + + /** + * Restore operation handle each backupImage in array + * @param svc: master services + * @param images: array BackupImage + * @param sTable: table to be restored + * @param tTable: table to be restored to + * @param truncateIfExists: truncate table + * @throws IOException exception + */ + + private void restoreImages(MasterServices svc, BackupImage[] images, TableName sTable, TableName tTable, + boolean truncateIfExists) throws IOException { + + // First image MUST be image of a FULL backup + BackupImage image = images[0]; + String rootDir = image.getRootDir(); + String backupId = image.getBackupId(); + Path backupRoot = new Path(rootDir); + RestoreServerUtil restoreTool = new RestoreServerUtil(conf, backupRoot, backupId); + Path tableBackupPath = HBackupFileSystem.getTableBackupPath(sTable, backupRoot, backupId); + String lastIncrBackupId = images.length == 1 ? null : images[images.length - 1].getBackupId(); + // We need hFS only for full restore (see the code) + BackupManifest manifest = HBackupFileSystem.getManifest(sTable, conf, backupRoot, backupId); + if (manifest.getType() == BackupType.FULL) { + LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from full" + + " backup image " + tableBackupPath.toString()); + restoreTool.fullRestoreTable(svc, tableBackupPath, sTable, tTable, truncateIfExists, + lastIncrBackupId); + } else { // incremental Backup + throw new IOException("Unexpected backup type " + image.getType()); + } + + if (images.length == 1) { + // full backup restore done + return; + } + + List dirList = new ArrayList(); + // add full backup path + // full backup path comes first + for (int i = 1; i < images.length; i++) { + BackupImage im = images[i]; + String logBackupDir = HBackupFileSystem.getLogBackupDir(im.getRootDir(), im.getBackupId()); + dirList.add(new Path(logBackupDir)); + } + + String dirs = StringUtils.join(dirList, ","); + LOG.info("Restoring '" + sTable + "' to '" + tTable + "' from log dirs: " + dirs); + Path[] paths = new Path[dirList.size()]; + dirList.toArray(paths); + restoreTool.incrementalRestoreTable(svc, tableBackupPath, paths, new TableName[] { sTable }, + new TableName[] { tTable }, lastIncrBackupId); + LOG.info(sTable + " has been successfully restored to " + tTable); + + } + + /** + * Restore operation. Stage 2: resolved Backup Image dependency + * @param svc: master services + * @param backupManifestMap : tableName, Manifest + * @param sTableArray The array of tables to be restored + * @param tTableArray The array of mapping tables to restore to + * @return set of BackupImages restored + * @throws IOException exception + */ + private void restore(MasterServices svc, HashMap backupManifestMap, + TableName[] sTableArray, TableName[] tTableArray, boolean isOverwrite) throws IOException { + TreeSet restoreImageSet = new TreeSet(); + boolean truncateIfExists = isOverwrite; + try { + for (int i = 0; i < sTableArray.length; i++) { + TableName table = sTableArray[i]; + BackupManifest manifest = backupManifestMap.get(table); + // Get the image list of this backup for restore in time order from old + // to new. + List list = new ArrayList(); + list.add(manifest.getBackupImage()); + TreeSet set = new TreeSet(list); + List depList = manifest.getDependentListByTable(table); + set.addAll(depList); + BackupImage[] arr = new BackupImage[set.size()]; + set.toArray(arr); + restoreImages(svc, arr, table, tTableArray[i], truncateIfExists); + restoreImageSet.addAll(list); + if (restoreImageSet != null && !restoreImageSet.isEmpty()) { + LOG.info("Restore includes the following image(s):"); + for (BackupImage image : restoreImageSet) { + LOG.info("Backup: " + + image.getBackupId() + + " " + + HBackupFileSystem.getTableBackupDir(image.getRootDir(), image.getBackupId(), + table)); + } + } + } + } catch (Exception e) { + LOG.error("Failed", e); + throw new IOException(e); + } + LOG.debug("restoreStage finished"); + } + + @Override + protected Flow executeFromState(final MasterProcedureEnv env, final RestoreTablesState state) + throws InterruptedException { + if (conf == null) { + conf = env.getMasterConfiguration(); + } + if (LOG.isTraceEnabled()) { + LOG.trace(this + " execute state=" + state); + } + TableName[] tTableArray = tTableList.toArray(new TableName[tTableList.size()]); + try { + switch (state) { + case VALIDATION: + + // check the target tables + checkTargetTables(env.getMasterServices().getConnection(), + env.getMasterServices().getTableStateManager(), tTableArray, isOverwrite); + + setNextState(RestoreTablesState.RESTORE_IMAGES); + break; + case RESTORE_IMAGES: + TableName[] sTableArray = sTableList.toArray(new TableName[sTableList.size()]); + HashMap backupManifestMap = new HashMap<>(); + // check and load backup image manifest for the tables + Path rootPath = new Path(targetRootDir); + HBackupFileSystem.checkImageManifestExist(backupManifestMap, sTableArray, conf, rootPath, + backupId); + restore(env.getMasterServices(), backupManifestMap, sTableArray, tTableArray, isOverwrite); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } catch (IOException e) { + setFailure("restore-table", e); + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState(final MasterProcedureEnv env, final RestoreTablesState state) + throws IOException { + } + + @Override + protected RestoreTablesState getState(final int stateId) { + return RestoreTablesState.valueOf(stateId); + } + + @Override + protected int getStateId(final RestoreTablesState state) { + return state.getNumber(); + } + + @Override + protected RestoreTablesState getInitialState() { + return RestoreTablesState.VALIDATION; + } + + @Override + protected void setNextState(final RestoreTablesState state) { + if (aborted.get()) { + setAbortFailure("snapshot-table", "abort requested"); + } else { + super.setNextState(state); + } + } + + @Override + public boolean abort(final MasterProcedureEnv env) { + aborted.set(true); + return true; + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" (targetRootDir="); + sb.append(targetRootDir); + sb.append(" isOverwrite= "); + sb.append(isOverwrite); + sb.append(" backupId= "); + sb.append(backupId); + sb.append(")"); + } + + MasterProtos.RestoreTablesRequest toRestoreTables() { + MasterProtos.RestoreTablesRequest.Builder bldr = MasterProtos.RestoreTablesRequest.newBuilder(); + bldr.setOverwrite(isOverwrite).setBackupId(backupId); + bldr.setBackupRootDir(targetRootDir); + for (TableName table : sTableList) { + bldr.addTables(ProtobufUtil.toProtoTableName(table)); + } + for (TableName table : tTableList) { + bldr.addTargetTables(ProtobufUtil.toProtoTableName(table)); + } + return bldr.build(); + } + + @Override + public void serializeStateData(final OutputStream stream) throws IOException { + super.serializeStateData(stream); + + MasterProtos.RestoreTablesRequest restoreTables = toRestoreTables(); + restoreTables.writeDelimitedTo(stream); + } + + @Override + public void deserializeStateData(final InputStream stream) throws IOException { + super.deserializeStateData(stream); + + MasterProtos.RestoreTablesRequest proto = + MasterProtos.RestoreTablesRequest.parseDelimitedFrom(stream); + backupId = proto.getBackupId(); + targetRootDir = proto.getBackupRootDir(); + isOverwrite = proto.getOverwrite(); + sTableList = new ArrayList<>(proto.getTablesList().size()); + for (HBaseProtos.TableName table : proto.getTablesList()) { + sTableList.add(ProtobufUtil.toTableName(table)); + } + tTableList = new ArrayList<>(proto.getTargetTablesList().size()); + for (HBaseProtos.TableName table : proto.getTargetTablesList()) { + tTableList.add(ProtobufUtil.toTableName(table)); + } + } + + @Override + public TableName getTableName() { + return TableName.BACKUP_TABLE_NAME; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.RESTORE; + } + + @Override + protected boolean acquireLock(final MasterProcedureEnv env) { + if (env.waitInitialized(this)) { + return false; + } + return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME); + } + + @Override + protected void releaseLock(final MasterProcedureEnv env) { + env.getProcedureQueue().releaseTableExclusiveLock(this, TableName.BACKUP_TABLE_NAME); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java index 37bfcc2..3da7860 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreServerUtil.java @@ -41,10 +41,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.BackupInfo; import org.apache.hadoop.hbase.backup.BackupRestoreServerFactory; import org.apache.hadoop.hbase.backup.HBackupFileSystem; -import org.apache.hadoop.hbase.backup.IncrementalRestoreService; +import org.apache.hadoop.hbase.backup.RestoreService; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; @@ -115,6 +114,7 @@ public class RestoreServerUtil { */ Path getTableArchivePath(TableName tableName) throws IOException { + Path baseDir = new Path(HBackupFileSystem.getTableBackupPath(tableName, backupRootPath, backupId), HConstants.HFILE_ARCHIVE_DIRECTORY); Path dataDir = new Path(baseDir, HConstants.BASE_NAMESPACE_DIR); @@ -148,8 +148,33 @@ public class RestoreServerUtil { return regionDirList; } + /** + * Gets region list + * @param tableName table name + * @param backupId backup id + * @return RegionList region list + * @throws FileNotFoundException exception + * @throws IOException exception + */ + ArrayList getRegionList(TableName tableName, String backupId) throws FileNotFoundException, + IOException { + Path tableArchivePath = + new Path(BackupClientUtil.getTableBackupDir(backupRootPath.toString(), + backupId, tableName)); + + ArrayList regionDirList = new ArrayList(); + FileStatus[] children = fs.listStatus(tableArchivePath); + for (FileStatus childStatus : children) { + // here child refer to each region(Name) + Path child = childStatus.getPath(); + regionDirList.add(child); + } + return regionDirList; + } + static void modifyTableSync(MasterServices svc, HTableDescriptor desc) throws IOException { svc.modifyTable(desc.getTableName(), desc, HConstants.NO_NONCE, HConstants.NO_NONCE); + @SuppressWarnings("serial") Pair status = new Pair() {{ setFirst(0); setSecond(0); @@ -234,16 +259,16 @@ public class RestoreServerUtil { LOG.info("Changed " + newTableDescriptor.getTableName() + " to: " + newTableDescriptor); } } - IncrementalRestoreService restoreService = - BackupRestoreServerFactory.getIncrementalRestoreService(conf); + RestoreService restoreService = + BackupRestoreServerFactory.getRestoreService(conf); - restoreService.run(logDirs, tableNames, newTableNames); + restoreService.run(logDirs, tableNames, newTableNames, false); } public void fullRestoreTable(MasterServices svc, Path tableBackupPath, TableName tableName, - TableName newTableName, boolean converted, boolean truncateIfExists, String lastIncrBackupId) + TableName newTableName, boolean truncateIfExists, String lastIncrBackupId) throws IOException { - restoreTableAndCreate(svc, tableName, newTableName, tableBackupPath, converted, truncateIfExists, + restoreTableAndCreate(svc, tableName, newTableName, tableBackupPath, truncateIfExists, lastIncrBackupId); } @@ -355,20 +380,19 @@ public class RestoreServerUtil { if (lastIncrBackupId != null) { String target = BackupClientUtil.getTableBackupDir(backupRootPath.toString(), lastIncrBackupId, tableName); - // Path target = new Path(info.getBackupStatus(tableName).getTargetDir()); return FSTableDescriptors.getTableDescriptorFromFs(fileSys, new Path(target)).getHTableDescriptor(); } return null; } - private void restoreTableAndCreate(MasterServices svc, TableName tableName, TableName newTableName, - Path tableBackupPath, boolean converted, boolean truncateIfExists, String lastIncrBackupId) - throws IOException { + private void restoreTableAndCreate(MasterServices svc, TableName tableName, + TableName newTableName, Path tableBackupPath, boolean truncateIfExists, + String lastIncrBackupId) throws IOException { if (newTableName == null || newTableName.equals("")) { newTableName = tableName; } - + boolean fullBackupRestoreOnly = lastIncrBackupId == null; FileSystem fileSys = tableBackupPath.getFileSystem(this.conf); HTableDescriptor tableDescriptor = getTableDescriptor(fileSys, tableName, lastIncrBackupId); @@ -384,7 +408,7 @@ public class RestoreServerUtil { if (snapshotMap.get(tableName) != null) { SnapshotDescription desc = SnapshotDescriptionUtils.readSnapshotInfo(fileSys, tableSnapshotPath); - SnapshotManifest manifest = SnapshotManifest.open(conf,fileSys,tableSnapshotPath,desc); + SnapshotManifest manifest = SnapshotManifest.open(conf, fileSys, tableSnapshotPath, desc); tableDescriptor = manifest.getTableDescriptor(); LOG.debug("obtained descriptor from " + manifest); } else { @@ -395,9 +419,9 @@ public class RestoreServerUtil { if (tableDescriptor == null) { LOG.debug("Found no table descriptor in the snapshot dir, previous schema was lost"); } - } else if (converted) { - // first check if this is a converted backup image - LOG.error("convert will be supported in a future jira"); + } else { + throw new IOException("Table snapshot directory: " + tableSnapshotPath + + " does not exist."); } } @@ -405,13 +429,13 @@ public class RestoreServerUtil { if (tableArchivePath == null) { if (tableDescriptor != null) { // find table descriptor but no archive dir => the table is empty, create table and exit - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("find table descriptor but no archive dir for table " + tableName + ", will only create table"); } tableDescriptor.setName(newTableName); - checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, null, - tableDescriptor, truncateIfExists); + checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, null, tableDescriptor, + truncateIfExists); return; } else { throw new IllegalStateException("Cannot restore hbase table because directory '" @@ -426,50 +450,61 @@ public class RestoreServerUtil { tableDescriptor.setName(newTableName); } - if (!converted) { - // record all region dirs: - // load all files in dir - try { - ArrayList regionPathList = getRegionList(tableName); - - // should only try to create the table with all region informations, so we could pre-split - // the regions in fine grain - checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, regionPathList, - tableDescriptor, truncateIfExists); - if (tableArchivePath != null) { - // start real restore through bulkload - // if the backup target is on local cluster, special action needed - Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath); - if (tempTableArchivePath.equals(tableArchivePath)) { - if(LOG.isDebugEnabled()) { - LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath); - } - } else { - regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir - if(LOG.isDebugEnabled()) { - LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath); - } - } + // record all region dirs: + // load all files in dir + try { + // Region splits for last incremental backup id + // We use it to create table with pre-splits + ArrayList regionPathList = + fullBackupRestoreOnly ? getRegionList(tableName) : getRegionList(tableName, + lastIncrBackupId); + + // should only try to create the table with all region informations, so we could pre-split + // the regions in fine grain + checkAndCreateTable(svc, tableBackupPath, tableName, newTableName, regionPathList, + tableDescriptor, truncateIfExists); + + // Now get region splits from full backup + regionPathList = getRegionList(tableName); + + // start real restore through bulkload + // if the backup target is on local cluster, special action needed + Path tempTableArchivePath = checkLocalAndBackup(tableArchivePath); + if (tempTableArchivePath.equals(tableArchivePath)) { + if (LOG.isDebugEnabled()) { + LOG.debug("TableArchivePath for bulkload using existPath: " + tableArchivePath); + } + } else { + regionPathList = getRegionList(tempTableArchivePath); // point to the tempDir + if (LOG.isDebugEnabled()) { + LOG.debug("TableArchivePath for bulkload using tempPath: " + tempTableArchivePath); + } + } - LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false); - for (Path regionPath : regionPathList) { - String regionName = regionPath.toString(); - if(LOG.isDebugEnabled()) { - LOG.debug("Restoring HFiles from directory " + regionName); - } - String[] args = { regionName, newTableName.getNameAsString() }; - loader.run(args); + if (fullBackupRestoreOnly) { + LoadIncrementalHFiles loader = createLoader(tempTableArchivePath, false); + for (Path regionPath : regionPathList) { + String regionName = regionPath.toString(); + if (LOG.isDebugEnabled()) { + LOG.debug("Restoring HFiles from directory " + regionName); } + String[] args = { regionName, newTableName.getNameAsString() }; + loader.run(args); } - // we do not recovered edits - } catch (Exception e) { - throw new IllegalStateException("Cannot restore hbase table", e); + } else { + // Run restore service + Path[] dirs = new Path[regionPathList.size()]; + regionPathList.toArray(dirs); + RestoreService restoreService = + BackupRestoreServerFactory.getRestoreService(conf); + + restoreService.run(dirs, new TableName[] { tableName }, new TableName[] { newTableName }, + true); } - } else { - LOG.debug("convert will be supported in a future jira"); + } catch (Exception e) { + throw new IllegalStateException("Cannot restore hbase table", e); } } - /** * Gets region list * @param tableArchivePath table archive path diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java new file mode 100644 index 0000000..dfcd7be --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat2.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFile.Reader; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple input format for HFiles. + * This code was borrowed from Apache Crunch project. + * Updated to the recent version of HBase. + */ +public class HFileInputFormat2 extends FileInputFormat { + + private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat2.class); + + /** + * File filter that removes all "hidden" files. This might be something worth removing from + * a more general purpose utility; it accounts for the presence of metadata files created + * in the way we're doing exports. + */ + static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; + + /** + * Record reader for HFiles. + */ + private static class HFileRecordReader extends RecordReader { + + private Reader in; + protected Configuration conf; + private HFileScanner scanner; + + /** + * A private cache of the key value so it doesn't need to be loaded twice from the scanner. + */ + private Cell value = null; + private long count; + private boolean seeked = false; + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + FileSplit fileSplit = (FileSplit) split; + conf = context.getConfiguration(); + Path path = fileSplit.getPath(); + FileSystem fs = path.getFileSystem(conf); + LOG.info("Initialize HFileRecordReader for {}", path); + this.in = HFile.createReader(fs, path, new CacheConfig(conf), conf); + + // The file info must be loaded before the scanner can be used. + // This seems like a bug in HBase, but it's easily worked around. + this.in.loadFileInfo(); + this.scanner = in.getScanner(false, false); + + } + + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + boolean hasNext; + if (!seeked) { + LOG.info("Seeking to start"); + hasNext = scanner.seekTo(); + seeked = true; + } else { + hasNext = scanner.next(); + } + if (!hasNext) { + return false; + } + value = scanner.getCell(); + count++; + return true; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public Cell getCurrentValue() throws IOException, InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to + // the start row, but better than nothing anyway. + return 1.0f * count / in.getEntries(); + } + + @Override + public void close() throws IOException { + if (in != null) { + in.close(); + in = null; + } + } + } + + @Override + protected List listStatus(JobContext job) throws IOException { + List result = new ArrayList(); + + // Explode out directories that match the original FileInputFormat filters + // since HFiles are written to directories where the + // directory name is the column name + for (FileStatus status : super.listStatus(job)) { + if (status.isDirectory()) { + FileSystem fs = status.getPath().getFileSystem(job.getConfiguration()); + for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) { + result.add(match); + } + } else { + result.add(status); + } + } + return result; + } + + @Override + public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + return new HFileRecordReader(); + } + + @Override + protected boolean isSplitable(JobContext context, Path filename) { + // This file isn't splittable. + return false; + } +} \ No newline at end of file diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index c54eee0..8c794c1 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -83,9 +83,9 @@ import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupRestoreConstants; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; -import org.apache.hadoop.hbase.backup.impl.RestoreTablesProcedure; import org.apache.hadoop.hbase.backup.master.FullTableBackupProcedure; import org.apache.hadoop.hbase.backup.master.IncrementalTableBackupProcedure; +import org.apache.hadoop.hbase.backup.master.RestoreTablesProcedure; import org.apache.hadoop.hbase.backup.util.RestoreServerUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index ff5e739..ec53a64 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -284,7 +284,7 @@ public class TestBackupBase { FileSystem fs = FileSystem.get(conf1); RemoteIterator it = fs.listFiles( new Path(BACKUP_ROOT_DIR), true); while(it.hasNext()){ - LOG.debug("DDEBUG: "+it.next().getPath()); + LOG.debug(it.next().getPath()); } }