Index: bin/hbase =================================================================== --- bin/hbase (revision 1515006) +++ bin/hbase (working copy) @@ -90,6 +90,7 @@ echo "PACKAGE MANAGEMENT" echo " classpath dump hbase CLASSPATH" echo " version print the version" + echo " upgradeTo96 upgrade hbase to 0.96" echo "" echo " or" echo " CLASSNAME run the class named CLASSNAME" @@ -271,6 +272,8 @@ CLASS='org.apache.hadoop.hbase.io.hfile.HFile' elif [ "$COMMAND" = "zkcli" ] ; then CLASS="org.apache.hadoop.hbase.zookeeper.ZooKeeperMainServer" +elif [ "$COMMAND" = "upgradeTo96" ] ; then + CLASS="org.apache.hadoop.hbase.migration.UpgradeTo96" elif [ "$COMMAND" = "master" ] ; then CLASS='org.apache.hadoop.hbase.master.HMaster' if [ "$1" != "stop" ] ; then Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (revision 1515006) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (working copy) @@ -427,7 +427,7 @@ * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under * /hbase/replication/peers/PEER_ID */ - private static byte[] toByteArray(final String clusterKey) { + public static byte[] toByteArray(final String clusterKey) { byte[] bytes = ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build() .toByteArray(); Index: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java (revision 1515006) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java (working copy) @@ -54,7 +54,7 @@ // Public for testing public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED); - protected static final byte[] DISABLED_ZNODE_BYTES = + public static final byte[] DISABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED); public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf, Index: hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (revision 1515006) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (working copy) @@ -90,7 +90,7 @@ // znode containing ephemeral nodes of the draining regionservers public String drainingZNode; // znode of currently active master - private String masterAddressZNode; + public String masterAddressZNode; // znode of this master in backup master directory, if not the active master public String backupMasterAddressesZNode; // znode containing the current cluster state Index: hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java (revision 1515006) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/migration/NamespaceUpgrade.java (working copy) @@ -105,6 +105,7 @@ public void init() throws IOException { this.rootDir = FSUtils.getRootDir(conf); + FSUtils.setFsDefault(getConf(), rootDir); this.fs = FileSystem.get(conf); Path tmpDataDir = new Path(rootDir, TMP_DATA_DIR); sysNsDir = new Path(tmpDataDir, NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/migration/UpgradeTo96.java (working copy) @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.migration; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.HFileV1Detector; +import org.apache.hadoop.hbase.util.ZKDataMigrator; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +public class UpgradeTo96 extends Configured implements Tool { + static final Log LOG = LogFactory.getLog(UpgradeTo96.class); + private Options options = new Options(); + private boolean upgradeNS = false, upgradeZnode = false, upgrade = false, + checkForHFileV1 = false; + /** + * Path of directory to check for HFileV1 + */ + private String dirToCheckForHFileV1; + + UpgradeTo96() { + setOptions(); + } + + private void setOptions() { + Option nsOption = new Option("ns", "namespace", false, "Upgrade the namespace to 0.96"); + options.addOption(nsOption); + options.addOption("h", "help", false, "Help"); + options.addOption(new Option("zk", "Znodes", false, "Upgrade the znodes")); + options.addOption(new Option("chkHFileV1", "checkForHFileV1", false, "Check for HFileV1")); + options.addOption(new Option("upgrade", "upgrade-NS-ZK", false, + "Upgrade both namespace and znodes")); + Option pathOption = new Option("dirTocheckHFileV1", "dirToCheckForHFileV1", true, + "Relative path of directory to check for HFileV1. Default is hbase.rootdir"); + pathOption.setRequired(false); + options.addOption(pathOption); + } + + private boolean parseOption(String[] args) throws ParseException { + if (args.length == 0) { + return false; // no args will process with default values. + } + CommandLineParser parser = new GnuParser(); + CommandLine cmd = parser.parse(options, args); + if (cmd.hasOption("h")) { + printUsage(); + return false; + } + if (cmd.hasOption("ns")) upgradeNS = true; + if (cmd.hasOption("zk")) upgradeZnode = true; + if (cmd.hasOption("upgrade")) upgrade = true; + if(cmd.hasOption("chkHFileV1")) checkForHFileV1 = true; + if(checkForHFileV1 && cmd.hasOption("dirTocheckHFileV1")) { + this.dirToCheckForHFileV1 = cmd.getOptionValue("dirTocheckHFileV1"); + } + return true; + } + + private void printUsage() { + System.out.println("This tool helps in upgrading to 0.96. The upgrade involves major " + + "compacting any HFileV1, upgrading file system layout for namespaces, and updating " + + "znodes. Please follow the following steps:"); + System.out.println(); + System.out.println("1) Major compact HFileV1: On a running pre-0.96 (i.e., 0.94.x, 0.92.x)" + + " cluster, run this tool using option -checkHFileV1. Look at the" + + " console output if there are any regions to major compact. (grep for \"Regions to Major" + + " compact\" on the console output). Major compact the listed regions. Keep on " + + "repeating this step till there are no regions to major compact. If none, please " + + "shutdown the hbase cluster. As we need to update znodes, ensure that zookeeper is " + + "up and running."); + System.out.println(); + System.out.println("2) Upgrade Namespace and Znodes: After step 1) run this tool using " + + "option -upgrade. In case you want to upgrade either of them, use option -ns, or -zk " + + "to upgrade namespace and znodes, respectively."); + System.out.println(); + System.out.println("Refer below for more help on options."); + System.out.println(); + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("UpgradeTo96", options, true); + } + + @Override + public int run(String[] args) throws Exception { + if (!parseOption(args)) { + printUsage(); + return -1; + } + if (checkForHFileV1) { + int res = doHFileV1Check(); + if (res == 0) System.out.println("No HFileV1 found."); + else { + System.out.println("ERROR: Please look for corrupt files, or regions to major compact."); + // return to avoid upgrade. + return -1; + } + } + if (checkForLiveProcesses()) { + System.err.println("ERROR: Some HBase processes are still alive or znodes not expired." + + "Please stop them before upgrade or try after some time."); + return -1; + } + if (upgrade) { // user intends to upgrade both ns and zk + return upgradeNamespaceAndZnodes(); + } else { + if (upgradeNS) { + return upgradeNamespace(); + } else if (upgradeZnode) { + return upgradeZnodes(); + } + } + return -1; + } + + private boolean checkForLiveProcesses() throws IOException { + ZooKeeperWatcher zkw = null; + try { + zkw = new ZooKeeperWatcher(getConf(), "Check Live Processes.", + new Abortable() { + private boolean aborted = false; + + @Override + public void abort(String why, Throwable e) { + System.err.println("Got aborted with reason: " + why + + ", and error: " + e); + this.aborted = true; + } + + @Override + public boolean isAborted() { + return this.aborted; + } + }); + boolean liveProcessesExists = false; + if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) { + return false; + } + if (ZKUtil.checkExists(zkw, zkw.backupMasterAddressesZNode) != -1) { + List backupMasters = ZKUtil.listChildrenNoWatch(zkw, + zkw.backupMasterAddressesZNode); + if (!backupMasters.isEmpty()) { + System.err.println("Backup master(s) " + backupMasters + + " are alive or backup-master znodes not expired."); + liveProcessesExists = true; + } + } + if (ZKUtil.checkExists(zkw, zkw.rsZNode) != -1) { + List regionServers = ZKUtil.listChildrenNoWatch(zkw, + zkw.rsZNode); + if (!regionServers.isEmpty()) { + System.err.println("Region server(s) " + regionServers + + " are alive or rs znodes not expired."); + liveProcessesExists = true; + } + } + if (ZKUtil.checkExists(zkw, zkw.masterAddressZNode) != -1) { + byte[] data = ZKUtil.getData(zkw, zkw.masterAddressZNode); + if (data != null && !Bytes.equals(data, HConstants.EMPTY_BYTE_ARRAY)) { + System.err.println("Active master at address " + Bytes.toString(data) + + " is still alive or master znode not expired."); + liveProcessesExists = true; + } + } + return liveProcessesExists; + } catch (Exception e) { + System.err.println(e); + throw new IOException(e); + } finally { + if (zkw != null) { + zkw.close(); + } + } + } + + private int doHFileV1Check() throws Exception { + String[] args = null; + if (dirToCheckForHFileV1 != null) args = new String[] { "-p" + dirToCheckForHFileV1 }; + return ToolRunner.run(getConf(), new HFileV1Detector(), args); + } + + private int upgradeNamespaceAndZnodes() throws Exception { + int res = upgradeNamespace(); + res = upgradeZnodes(); + return res; + } + + private int upgradeNamespace() throws Exception { + System.out.println("Upgrading Namespace"); + try { + int res = ToolRunner.run(getConf(), new NamespaceUpgrade(), new String[] { "--upgrade" }); + System.out.println("Successfully Upgraded NameSpace."); + return res; + } catch (Exception e) { + System.err.println("FAILURE: while updating Namespace"); + throw new IOException(e); + } + } + + private int upgradeZnodes() throws Exception { + System.out.println("Upgrading Znodes"); + try { + int res = ToolRunner.run(getConf(), new ZKDataMigrator(), null); + System.out.println("Succesfully upgraded znodes."); + return res; + } catch (Exception e) { + System.err.println("FAILURE: while updating Znodes"); + throw new IOException(e); + } + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(HBaseConfiguration.create(), new UpgradeTo96(), args)); + } +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileV1Detector.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileV1Detector.java (revision 1515006) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/HFileV1Detector.java (working copy) @@ -69,9 +69,27 @@ private static final Log LOG = LogFactory.getLog(HFileV1Detector.class); private static final int DEFAULT_NUM_OF_THREADS = 10; private int numOfThreads; - private Path dirToProcess; + /** + * directory to start the processing. + */ + private Path targetDirPath; + /** + * executor for processing regions. + */ + private ExecutorService exec; + + /** + * Keeps record of processed tables. + */ + private final Set processedTables = new HashSet(); + /** + * set of corrupted HFiles (with undetermined major version) + */ private final Set corruptedHFiles = Collections .newSetFromMap(new ConcurrentHashMap()); + /** + * set of HfileV1; + */ private final Set hFileV1Set = Collections .newSetFromMap(new ConcurrentHashMap()); @@ -107,7 +125,7 @@ } if (cmd.hasOption("p")) { - dirToProcess = new Path(cmd.getOptionValue("p")); + this.targetDirPath = new Path(FSUtils.getRootDir(getConf()), cmd.getOptionValue("p")); } try { if (cmd.hasOption("n")) { @@ -117,7 +135,7 @@ + " Continuing with default value " + DEFAULT_NUM_OF_THREADS); return true; } - numOfThreads = n; + this.numOfThreads = n; } } catch (NumberFormatException nfe) { System.err.println("Please select a valid number for threads"); @@ -126,56 +144,78 @@ return true; } + /** + * Checks for HFileV1. + * @return 0 in case no HFileV1 is present. + * 1 in case a HFileV1 is present. or there are files which have corrupt major version + * (neither V1 nor V2). + */ @Override public int run(String args[]) throws IOException, ParseException { fs = FileSystem.get(getConf()); + FSUtils.setFsDefault(getConf(), new Path(fs.getUri())); numOfThreads = DEFAULT_NUM_OF_THREADS; - dirToProcess = FSUtils.getRootDir(getConf()); + targetDirPath = FSUtils.getRootDir(getConf()); if (!parseOption(args)) { - System.exit(1); + System.exit(-1); } - ExecutorService exec = Executors.newFixedThreadPool(numOfThreads); - Set regionsWithHFileV1; + this.exec = Executors.newFixedThreadPool(numOfThreads); try { - regionsWithHFileV1 = checkForV1Files(dirToProcess, exec); - printHRegionsWithHFileV1(regionsWithHFileV1); - printAllHFileV1(); - printCorruptedHFiles(); - if (hFileV1Set.isEmpty() && corruptedHFiles.isEmpty()) { - // all clear. - System.out.println("No HFile V1 Found"); - } + return processResult(checkForV1Files(targetDirPath)); } catch (Exception e) { System.err.println(e); - return 1; + throw new IOException(e); } finally { exec.shutdown(); fs.close(); } - return 0; } + private int processResult(Set regionsWithHFileV1) { + System.out.println("Result: "); + System.out.println(); + printSet(processedTables, "Tables Processed: "); + System.out.println(); + System.out.println("Number of HFileV1: " + hFileV1Set.size()); + printSet(hFileV1Set, "HFileV1:"); + System.out.println(); + System.out.println("Number of corrupted files:" + corruptedHFiles.size()); + printSet(corruptedHFiles, "Corrupted Files: "); + System.out.println(); + System.out.println("Number of Regions with HFileV1: " + regionsWithHFileV1.size()); + printSet(regionsWithHFileV1, "Regions to Major Compact: "); + System.out.println(); + return (hFileV1Set.isEmpty() && corruptedHFiles.isEmpty()) ? 0 : 1 ; + } + + private void printSet(Set result, String msg) { + System.out.println(msg); + for (Path p : result) { + System.out.println(p); + } + } + /** * Takes a directory path, and lists out any HFileV1, if present. * @param targetDir directory to start looking for HFilev1. - * @param exec * @return set of Regions that have HFileV1 * @throws IOException */ - private Set checkForV1Files(Path targetDir, final ExecutorService exec) throws IOException { - if (isTableDir(fs, targetDir)) { - return processTable(targetDir, exec); - } - // user has passed a hbase installation directory. + private Set checkForV1Files(Path targetDir) throws IOException { if (!fs.exists(targetDir)) { throw new IOException("The given path does not exist: " + targetDir); } + if (isTableDir(fs, targetDir)) { + processedTables.add(targetDir); + return processTable(targetDir); + } Set regionsWithHFileV1 = new HashSet(); FileStatus[] fsStats = fs.listStatus(targetDir); for (FileStatus fsStat : fsStats) { if (isTableDir(fs, fsStat.getPath())) { + processedTables.add(fsStat.getPath()); // look for regions and find out any v1 file. - regionsWithHFileV1.addAll(processTable(fsStat.getPath(), exec)); + regionsWithHFileV1.addAll(processTable(fsStat.getPath())); } else { LOG.info("Ignoring path: " + fsStat.getPath()); } @@ -184,13 +224,12 @@ } /** - * Find out the regions in the table which has an HFile v1 in it. + * Find out regions in the table which have HFileV1. * @param tableDir - * @param exec * @return the set of regions containing HFile v1. * @throws IOException */ - private Set processTable(Path tableDir, final ExecutorService exec) throws IOException { + private Set processTable(Path tableDir) throws IOException { // list out the regions and then process each file in it. LOG.info("processing table: " + tableDir); List> regionLevelResults = new ArrayList>(); @@ -200,7 +239,7 @@ for (FileStatus fsStat : fsStats) { // process each region if (isRegionDir(fs, fsStat.getPath())) { - regionLevelResults.add(processRegion(fsStat.getPath(), exec)); + regionLevelResults.add(processRegion(fsStat.getPath())); } } for (Future f : regionLevelResults) { @@ -221,10 +260,9 @@ * Each region is processed by a separate handler. If a HRegion has a hfileV1, its path is * returned as the future result, otherwise, a null value is returned. * @param regionDir Region to process. - * @param exec * @return corresponding Future object. */ - private Future processRegion(final Path regionDir, final ExecutorService exec) { + private Future processRegion(final Path regionDir) { LOG.info("processing region: " + regionDir); Callable regionCallable = new Callable() { @Override @@ -271,7 +309,9 @@ } private static boolean isTableDir(final FileSystem fs, final Path path) throws IOException { - return FSTableDescriptors.getTableInfoPath(fs, path) != null; + // check for old format, of having /table/.tableinfo + return (FSTableDescriptors.getTableInfoPath(fs, path) != null || FSTableDescriptors + .getCurrentTableInfoStatus(fs, path, false) != null); } private static boolean isRegionDir(final FileSystem fs, final Path path) throws IOException { @@ -280,43 +320,6 @@ } - private void printHRegionsWithHFileV1(Set regionsHavingHFileV1) { - if (!regionsHavingHFileV1.isEmpty()) { - System.out.println(); - System.out.println("Following regions has HFileV1 and needs to be Major Compacted:"); - System.out.println(); - for (Path r : regionsHavingHFileV1) { - System.out.println(r); - } - System.out.println(); - } - } - - private void printAllHFileV1() { - if (!hFileV1Set.isEmpty()) { - System.out.println(); - System.out.println("Following HFileV1 are found:"); - System.out.println(); - for (Path r : hFileV1Set) { - System.out.println(r); - } - System.out.println(); - } - - } - - private void printCorruptedHFiles() { - if (!corruptedHFiles.isEmpty()) { - System.out.println(); - System.out.println("Following HFiles are corrupted as their version is unknown:"); - System.out.println(); - for (Path r : corruptedHFiles) { - System.out.println(r); - } - System.out.println(); - } - } - public static void main(String args[]) throws Exception { System.exit(ToolRunner.run(HBaseConfiguration.create(), new HFileV1Detector(), args)); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/util/ZKDataMigrator.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/ZKDataMigrator.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/ZKDataMigrator.java (working copy) @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl; +import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; + +/** + * Tool to migrate zookeeper data of older hbase versions(<0.95.0) to PB. + */ +public class ZKDataMigrator extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(ZKDataMigrator.class); + + @Override + public int run(String[] as) throws Exception { + Configuration conf = getConf(); + ZooKeeperWatcher zkw = null; + try { + zkw = new ZooKeeperWatcher(getConf(), "Migrate ZK data to PB.", + new ZKDataMigratorAbortable()); + FileSystem fs = FileSystem.get(getConf()); + FSUtils.setFsDefault(conf, new Path(fs.getUri())); + if (ZKUtil.checkExists(zkw, zkw.baseZNode) == -1) { + System.out + .println("No hbase related data available in zookeeper. returning.."); + return 0; + } + List children = ZKUtil.listChildrenNoWatch(zkw, zkw.baseZNode); + if (children == null) { + System.out.println("No child nodes to mirgrate. returning.."); + return 0; + } + String childPath = null; + for (String child : children) { + childPath = ZKUtil.joinZNode(zkw.baseZNode, child); + if (child.equals(conf.get("zookeeper.znode.rootserver", + "root-region-server"))) { + // -ROOT- region no longer present from 0.95.0, so we can remove this + // znode + ZKUtil.deleteNodeRecursively(zkw, childPath); + // TODO delete root table path from file system. + } else if (child.equals(conf.get("zookeeper.znode.rs", "rs"))) { + // Since there is no live region server instance during migration, we + // can remove this znode as well. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf.get("zookeeper.znode.draining.rs", + "draining"))) { + // If we want to migrate to 0.95.0 from older versions we need to stop + // the existing cluster. So there wont be any draining servers so we + // can + // remove it. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf.get("zookeeper.znode.master", "master"))) { + // Since there is no live master instance during migration, we can + // remove this znode as well. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf.get("zookeeper.znode.backup.masters", + "backup-masters"))) { + // Since there is no live backup master instances during migration, we + // can remove this znode as well. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf.get("zookeeper.znode.state", "shutdown"))) { + // shutdown node is not present from 0.95.0 onwards. Its renamed to + // "running". We can delete it. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf.get("zookeeper.znode.unassigned", + "unassigned"))) { + // Any way during clean cluster startup we will remove all unassigned + // region nodes. we can delete all children nodes as well. This znode + // is + // renamed to "regions-in-transition" from 0.95.0 onwards. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf.get("zookeeper.znode.tableEnableDisable", + "table")) + || child.equals(conf.get( + "zookeeper.znode.masterTableEnableDisable", "table"))) { + checkAndMigrateTableStatesToPB(zkw); + } else if (child.equals(conf.get( + "zookeeper.znode.masterTableEnableDisable92", "table92"))) { + // This is replica of table states from tableZnode so we can remove + // this. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf + .get("zookeeper.znode.splitlog", "splitlog"))) { + // This znode no longer available from 0.95.0 onwards, we can remove + // it. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(conf.get("zookeeper.znode.replication", + "replication"))) { + checkAndMigrateReplicationNodesToPB(zkw); + } else if (child.equals(conf.get("zookeeper.znode.clusterId", "hbaseid"))) { + // it will be re-created by master. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } else if (child.equals(SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION)) { + // not needed as it is transient. + ZKUtil.deleteNodeRecursively(zkw, childPath); + } + } + } catch (Exception e) { + System.err.println(e); + throw new IOException(e); + } finally { + if (zkw != null) { + zkw.close(); + } + } + return 0; + } + + private void checkAndMigrateTableStatesToPB(ZooKeeperWatcher zkw) + throws KeeperException { + List tables = ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode); + if (tables == null) { + System.out + .println("No table present to migrate table state to PB. returning.."); + } + for (String table : tables) { + LOG.warn("Handling table: "+ table); + + String znode = ZKUtil.joinZNode(zkw.tableZNode, table); + // Delete -ROOT- table state znode since its no longer present in 0.95.0 + // onwards. + if (table.equals("-ROOT-") || table.equals(".META.")) { + ZKUtil.deleteNode(zkw, znode); + continue; + } + byte[] data = ZKUtil.getData(zkw, znode); + if (ProtobufUtil.isPBMagicPrefix(data)) + continue; + ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table + .newBuilder(); + builder + .setState(ZooKeeperProtos.Table.State.valueOf(Bytes.toString(data))); + data = ProtobufUtil.prependPBMagic(builder.build().toByteArray()); + LOG.warn("Handling znode: "+znode); + ZKUtil.setData(zkw, znode, data); + } + } + + private void checkAndMigrateReplicationNodesToPB(ZooKeeperWatcher zkw) + throws KeeperException { + String replicationZnodeName = getConf().get("zookeeper.znode.replication", + "replication"); + String replicationPath = ZKUtil.joinZNode(zkw.baseZNode, + replicationZnodeName); + List replicationZnodes = ZKUtil.listChildrenNoWatch(zkw, + replicationPath); + if (replicationZnodes == null) { + System.out + .println("No replication related znodes present to migrate. returning.."); + } + for (String child : replicationZnodes) { + String znode = ZKUtil.joinZNode(replicationPath, child); + if (child.equals(getConf().get("zookeeper.znode.replication.peers", + "peers"))) { + List peers = ZKUtil.listChildrenNoWatch(zkw, znode); + if (peers == null || peers.isEmpty()) { + System.out.println("No peers present to migrate. returning.."); + continue; + } + checkAndMigratePeerZnodesToPB(zkw, znode, peers); + } else if (child.equals(getConf().get( + "zookeeper.znode.replication.state", "state"))) { + // This is no longer used in >=0.95.x + ZKUtil.deleteNodeRecursively(zkw, znode); + } else if (child.equals(getConf().get("zookeeper.znode.replication.rs", + "rs"))) { + List rsList = ZKUtil.listChildrenNoWatch(zkw, znode); + if (rsList == null || rsList.isEmpty()) continue; + for (String rs : rsList) { + checkAndMigrateQueuesToPB(zkw, znode, rs); + } + } + } + } + + private void checkAndMigrateQueuesToPB(ZooKeeperWatcher zkw, String znode, String rs) + throws KeeperException, NoNodeException { + String rsPath = ZKUtil.joinZNode(znode, rs); + List peers = ZKUtil.listChildrenNoWatch(zkw, rsPath); + if (peers == null || peers.isEmpty()) + return; + String peerPath = null; + for (String peer : peers) { + peerPath = ZKUtil.joinZNode(rsPath, peer); + List files = ZKUtil.listChildrenNoWatch(zkw, peerPath); + if (files == null || files.isEmpty()) continue; + String filePath = null; + for (String file : files) { + filePath = ZKUtil.joinZNode(peerPath, file); + byte[] data = ZKUtil.getData(zkw, filePath); + if(data == null || Bytes.equals(data, HConstants.EMPTY_BYTE_ARRAY)) continue; + if (ProtobufUtil.isPBMagicPrefix(data)) continue; + ZKUtil.setData(zkw, filePath, ZKUtil.positionToByteArray(Long + .parseLong(Bytes.toString(data)))); + } + } + } + + private void checkAndMigratePeerZnodesToPB(ZooKeeperWatcher zkw, String znode, + List peers) throws KeeperException, NoNodeException { + for (String peer : peers) { + String peerZnode = ZKUtil.joinZNode(znode, peer); + LOG.debug("peer to process:"+peer); + byte[] data = ZKUtil.getData(zkw, peerZnode); + LOG.debug("daa: "+Bytes.toString(data)); + if (!ProtobufUtil.isPBMagicPrefix(data)) { + migrateClusterKeyToPB(zkw, peerZnode, data); + } + String peerStatePath = ZKUtil.joinZNode( + peerZnode, + getConf().get("zookeeper.znode.replication.peers.state", + "peer-state")); + if (ZKUtil.checkExists(zkw, peerStatePath) != -1) { + data = ZKUtil.getData(zkw, peerStatePath); + if (ProtobufUtil.isPBMagicPrefix(data)) continue; + migratePeerStateToPB(zkw, data, peerStatePath); + } + } + } + + private void migrateClusterKeyToPB(ZooKeeperWatcher zkw, String peerZnode, + byte[] data) throws KeeperException, NoNodeException { + ZKUtil.setData(zkw, peerZnode, + ReplicationPeersZKImpl.toByteArray(Bytes.toString(data))); + } + + private void migratePeerStateToPB(ZooKeeperWatcher zkw, byte[] data, + String peerStatePath) throws KeeperException, NoNodeException { + String state = Bytes.toString(data); + if (ZooKeeperProtos.ReplicationState.State.ENABLED.name().equals( + state)) { + ZKUtil.setData(zkw, peerStatePath, + ReplicationStateZKBase.ENABLED_ZNODE_BYTES); + } else if (ZooKeeperProtos.ReplicationState.State.DISABLED.name() + .equals(state)) { + ZKUtil.setData(zkw, peerStatePath, + ReplicationStateZKBase.DISABLED_ZNODE_BYTES); + } + } + + public static void main(String args[]) throws Exception { + System.exit(ToolRunner.run(HBaseConfiguration.create(), + new ZKDataMigrator(), args)); + } + + static class ZKDataMigratorAbortable implements Abortable { + private boolean aborted = false; + @Override + public void abort(String why, Throwable e) { + System.err.println("Got aborted with reason: " + why + ", and error: " + e); + this.aborted = true; + } + + @Override + public boolean isAborted() { + return this.aborted; + } + } + +} Index: hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaMigrationConvertingToPB.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaMigrationConvertingToPB.java (revision 1515006) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/catalog/TestMetaMigrationConvertingToPB.java (working copy) @@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.util.ToolRunner; @@ -179,15 +178,6 @@ } @Test - public void testMetaUpdatedFlagInROOT() throws Exception { - HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); - boolean metaUpdated = MetaMigrationConvertingToPB. - isMetaTableUpdated(master.getCatalogTracker()); - assertEquals(true, metaUpdated); - verifyMetaRowsAreUpdated(master.getCatalogTracker()); - } - - @Test public void testMetaMigration() throws Exception { LOG.info("Starting testMetaMigration"); final byte [] FAMILY = Bytes.toBytes("family"); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/migration/TestNamespaceUpgrade.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/migration/TestNamespaceUpgrade.java (revision 1515006) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/migration/TestNamespaceUpgrade.java (working copy) @@ -150,7 +150,7 @@ } } - private static File untar(final File testdir) throws IOException { + static File untar(final File testdir) throws IOException { // Find the src data under src/test/data final String datafile = "TestNamespaceUpgrade"; File srcTarFile = new File(