Index: src/test/org/apache/hadoop/hbase/util/TestOnlineMerge.java =================================================================== --- src/test/org/apache/hadoop/hbase/util/TestOnlineMerge.java (revision 0) +++ src/test/org/apache/hadoop/hbase/util/TestOnlineMerge.java (revision 0) @@ -0,0 +1,109 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseClusterTestCase; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.util.ToolRunner; + +import java.util.List; + +public class TestOnlineMerge extends HBaseClusterTestCase { + private static final Log LOG = LogFactory.getLog(TestOnlineMerge.class); + private static final String TABLE_NAME = "mrtest"; + private static final byte[] FAM = Bytes.toBytes("contents"); + private final HTableDescriptor desc; + private HTable loadedTable; + + public TestOnlineMerge() { + super(1); + this.conf.setInt("hbase.client.retries.number", 3); + this.conf.setInt("hbase.client.pause", 1000); + desc = new HTableDescriptor(TABLE_NAME); + desc.addFamily(new HColumnDescriptor(FAM)); + desc.setMaxFileSize(1024*32); + desc.setMemStoreFlushSize(1024*16); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(desc); + loadedTable = new HTable(TABLE_NAME); + fillTable(loadedTable); + } + + public void testOnlineMerge() throws Exception { + OnlineMerge merger = new OnlineMerge(this.conf); + HTable meta = new HTable(HConstants.META_TABLE_NAME); + + List hirs; + // Regions continue to split while we're here. Wait until it's stable + do { + hirs = merger.getListOfMetaRows(meta); + Thread.sleep(200); + } while (hirs.size() < 9); + + for (int i = 0; i < hirs.size()-1; i+=2) { + int errCode = ToolRunner.run(merger, + new String[] {this.desc.getNameAsString(), + hirs.get(i).getRegionNameAsString(), + hirs.get(i+1).getRegionNameAsString()} + ); + if (errCode != 0) { + fail("ToolRunner didn't return 0, see logs"); + } + Get get = new Get(hirs.get(i+1).getEndKey()); + Result res = loadedTable.get(get); + assertEquals(1, res.size()); + } + + } + + // Creates about 9 regions + private void fillTable(HTable table) throws Exception{ + byte[] k = new byte[3]; + int rowCount = 0; + for (byte b1 = 'a'; b1 <= 'z'; b1++) { + for (byte b2 = 'a'; b2 <= 'z'; b2++) { + for (byte b3 = 'a'; b3 <= 'z'; b3++) { + k[0] = b1; + k[1] = b2; + k[2] = b3; + Put put = new Put(k); + put.add(FAM, null, k); + table.put(put); + rowCount++; + } + } + } + } +} Index: src/java/org/apache/hadoop/hbase/util/OnlineMerge.java =================================================================== --- src/java/org/apache/hadoop/hbase/util/OnlineMerge.java (revision 0) +++ src/java/org/apache/hadoop/hbase/util/OnlineMerge.java (revision 0) @@ -0,0 +1,376 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HLog; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.ipc.RemoteException; + +/** + * Utility that can merge any two regions in the same table: adjacent, + * overlapping or disjoint. It can also merge every regions, two by two. + */ +public class OnlineMerge extends Configured implements Tool { + static final Log LOG = LogFactory.getLog(Merge.class); + private final HBaseConfiguration conf; + private Path rootdir; + private volatile MetaUtils utils; + private byte [] tableName; // Name of table + private volatile byte [] region1; // Name of region 1 + private volatile byte [] region2; // Name of region 2 + private volatile boolean isMetaTable; + private volatile HRegionInfo mergeInfo; + + /** default constructor */ + public OnlineMerge() { + this(new HBaseConfiguration()); + } + + /** + * @param conf + */ + public OnlineMerge(HBaseConfiguration conf) { + super(conf); + this.conf = conf; + this.conf.setInt("hbase.client.retries.number", 3); + this.conf.setInt("hbase.client.pause", 1000); + this.mergeInfo = null; + } + + public int run(String[] args) throws Exception { + if (parseArgs(args) != 0) { + return -1; + } + + // Verify file system is up. + FileSystem fs = FileSystem.get(this.conf); // get DFS handle + LOG.info("Verifying that file system is available..."); + try { + FSUtils.checkFileSystemAvailable(fs); + } catch (IOException e) { + LOG.fatal("File system is not available", e); + return -1; + } + + // Verify HBase is up + LOG.info("Verifying that HBase is running..."); + try { + HBaseAdmin.checkHBaseAvailable(conf); + } catch (MasterNotRunningException e) { + LOG.fatal("HBase cluster must be on-line."); + return -1; + } + + // Initialize MetaUtils and and get the root of the HBase installation + + this.utils = new MetaUtils(conf); + this.rootdir = FSUtils.getRootDir(this.conf); + try { + if (isMetaTable) { + throw new Exception ("Can't merge meta tables online"); + } else { + mergeRegions(); + } + return 0; + } catch (Exception e) { + LOG.fatal("Merge failed", e); + return -1; + + } finally { + if (this.utils != null) { + this.utils.shutdown(); + } + } + } + + /* + * Merges two regions from a user table. + */ + private void mergeRegions() throws IOException { + HTable meta = new HTable(HConstants.META_TABLE_NAME); + List hris; + if (region1 == null) { + hris = getListOfMetaRows(meta); + if (hris.size() < 2) { + throw new IOException("The table doesn't have 2 or more regions"); + } + } else { + hris = new ArrayList(2); + Get get = new Get(region1); + get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + Result res = meta.get(get); + HRegionInfo info1 = Writables.getHRegionInfo((res == null)? null: + res.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); + if (info1== null) { + throw new NullPointerException("info1 is null using key " + + Bytes.toString(region1)); + } + + get = new Get(region2); + get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + Result res2 = meta.get(get); + HRegionInfo info2 = Writables.getHRegionInfo((res2 == null)? null: + res2.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); + if (info2 == null) { + throw new NullPointerException("info2 is null using key "); + } + hris.add(info1); + hris.add(info2); + } + byte[] tableName = hris.get(0).getTableDesc().getName(); + HTable mergedTable = new HTable(tableName); + + for(int i = 0; i < hris.size() - 1; i += 2) { + HRegionInfo info1 = hris.get(i); + HRegionInfo info2 = hris.get(i + 1); + LOG.info("Merging regions " + info1.getRegionNameAsString() + " and " + + Bytes.toString(this.region2) + " in table " + Bytes.toString(this.tableName)); + // Scan the root region for all the meta regions that contain the regions + // we're merging. + Get get = new Get(hris.get(i).getEndKey()); + // This puts the location in our local cache so that when we do a get + // we go directly to the RS + mergedTable.get(get); + + HBaseAdmin admin = new HBaseAdmin(conf); + offlineRegion(info1, meta); + offlineRegion(info2, meta); + admin.closeRegion(info1.getRegionName()); + admin.closeRegion(info2.getRegionName()); + + LOG.info("Making sure the region is down"); + HRegionLocation loc = mergedTable.getConnection(). + getRegionLocation(tableName, info1.getEndKey(), false); + while (true) { + try { + mergedTable.getConnection().getHRegionConnection( + loc.getServerAddress()).get(info1.getRegionName(), get); + LOG.info("Waiting a bit until it's closed"); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOG.error("Interrupted during the sleep", e); + } + } catch (RemoteException ex) { + IOException ioe = RemoteExceptionHandler.decodeRemoteException(ex); + if (ioe instanceof NotServingRegionException || + ioe.getCause() instanceof NotServingRegionException) { + break; + } + } + } + + HRegion merged = merge(info1, info2, meta); + + // Now find the meta region which will contain the newly merged region + LOG.info("Adding " + merged.getRegionInfo() + " to " + + ".META."); + + Put put = new Put(merged.getRegionName()); + put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, + Writables.getBytes(merged.getRegionInfo())); + meta.put(put); + merged.close(); + } + } + + /** + * Get the list of a HRIs in a table + * @param meta HTable to the .META. table + * @return list of hris + * @throws IOException + */ + List getListOfMetaRows(HTable meta) throws IOException { + List hris = new ArrayList(); + Scan scan = new Scan(); + ResultScanner resScan = meta.getScanner(scan); + for (Result res : resScan) { + HRegionInfo hri = + Writables.getHRegionInfo(res.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER)); + if (hri.isOffline() || hri.isSplit()) { + LOG.info("Region split or offline " + hri.getRegionNameAsString()); + continue; + } + hris.add(hri); + LOG.info(hri.getRegionNameAsString()); + } + return hris; + } + + // Mark this region offline in META so that we don't reassign it + private void offlineRegion(HRegionInfo r, HTable meta) throws IOException { + r.setOffline(true); + Put p = new Put(r.getRegionName()); + p.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, Writables.getBytes(r)); + meta.put(p); + } + + /* + * Actually merge two regions and update their info in the meta region(s) + * If the meta is split, meta1 may be different from meta2. (and we may have + * to scan the meta if the resulting merged region does not go in either) + * Returns HRegion object for newly merged region + */ + private HRegion merge(HRegionInfo info1, HRegionInfo info2, HTable meta) + throws IOException { + HRegion merged = null; + HLog log = utils.getLog(); + HRegion r1 = HRegion.openHRegion(info1, this.rootdir, log, this.conf); + try { + HRegion r2 = HRegion.openHRegion(info2, this.rootdir, log, this.conf); + try { + merged = HRegion.merge(r1, r2); + } finally { + if (!r2.isClosed()) { + r2.close(); + } + } + } finally { + if (!r1.isClosed()) { + r1.close(); + } + } + + // Remove the old regions from meta. + // HRegion.merge has already deleted their files + + removeRegionFromMeta(meta, info1); + removeRegionFromMeta(meta, info2); + + this.mergeInfo = merged.getRegionInfo(); + return merged; + } + + /* + * Removes a region's meta information from the passed meta + * region. + * + * @param meta META HRegion to be updated + * @param regioninfo HRegionInfo of region to remove from meta + * + * @throws IOException + */ + private void removeRegionFromMeta(HTable meta, HRegionInfo regioninfo) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing region: " + regioninfo + " from " + meta); + } + + Delete delete = new Delete(regioninfo.getRegionName(), + System.currentTimeMillis(), null); + meta.delete(delete); + } + + /* + * Adds a region's meta information from the passed meta + * region. + * + * @param metainfo META HRegionInfo to be updated + * @param region HRegion to add to meta + * + * @throws IOException + */ + private int parseArgs(String[] args) { + GenericOptionsParser parser = + new GenericOptionsParser(this.getConf(), args); + + String[] remainingArgs = parser.getRemainingArgs(); + if (remainingArgs.length > 3 || remainingArgs.length == 0) { + usage(); + return -1; + } + tableName = Bytes.toBytes(remainingArgs[0]); + isMetaTable = Bytes.compareTo(tableName, HConstants.META_TABLE_NAME) == 0; + int status = 0; + + if (remainingArgs.length > 1) { + region1 = Bytes.toBytesBinary(remainingArgs[1]); + region2 = Bytes.toBytesBinary(remainingArgs[2]); + if (notInTable(tableName, region1) || notInTable(tableName, region2)) { + status = -1; + } else if (Bytes.equals(region1, region2)) { + LOG.error("Can't merge a region with itself"); + status = -1; + } + } + return status; + } + + private boolean notInTable(final byte [] tn, final byte [] rn) { + if (WritableComparator.compareBytes(tn, 0, tn.length, rn, 0, tn.length) != 0) { + LOG.error("Region " + Bytes.toString(rn) + " does not belong to table " + + Bytes.toString(tn)); + return true; + } + return false; + } + + private void usage() { + System.err.println( + "Usage: bin/hbase onlinemerge [ ]\n"); + } + + /** + * Main program + * + * @param args + */ + public static void main(String[] args) { + int status = 0; + try { + status = ToolRunner.run(new OnlineMerge(), args); + } catch (Exception e) { + LOG.error("exiting due to error", e); + status = -1; + } + System.exit(status); + } +} Index: bin/hbase =================================================================== --- bin/hbase (revision 955784) +++ bin/hbase (working copy) @@ -194,6 +194,8 @@ if [ "$1" != "stop" ] ; then HBASE_OPTS="$HBASE_OPTS $HBASE_ZOOKEEPER_OPTS" fi +elif [ "$COMMAND" = "onlinemerge" ] ; then + CLASS='org.apache.hadoop.hbase.util.OnlineMerge' else CLASS=$COMMAND fi