Index: src/test/java/org/apache/hadoop/hbase/util/TestOnlineMerge.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/util/TestOnlineMerge.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/util/TestOnlineMerge.java (revision 0) @@ -0,0 +1,111 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseClusterTestCase; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.util.ToolRunner; + +import java.util.List; + +public class TestOnlineMerge extends HBaseClusterTestCase { + private static final Log LOG = LogFactory.getLog(TestOnlineMerge.class); + private static final String TABLE_NAME = "mrtest"; + private static final byte[] FAM = Bytes.toBytes("contents"); + private final HTableDescriptor desc; + private HTable loadedTable; + + public TestOnlineMerge() { + super(1); + this.conf.setInt("hbase.client.retries.number", 3); + this.conf.setInt("hbase.client.pause", 1000); + desc = new HTableDescriptor(TABLE_NAME); + desc.addFamily(new HColumnDescriptor(FAM)); + desc.setMaxFileSize(1024*32); + desc.setMemStoreFlushSize(1024*16); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(desc); + loadedTable = new HTable(TABLE_NAME); + fillTable(loadedTable,1); + } + + public void testOnlineMerge() throws Exception { + OnlineMerge merger = new OnlineMerge(this.conf); + HTable meta_local = new HTable(HConstants.META_TABLE_NAME); + + List hirs; + // Regions continue to split while we're here. Wait until it's stable + do { + hirs = merger.getListOfMetaRows(meta_local, Bytes.toBytes(TABLE_NAME)); + Thread.sleep(200); + } while (hirs.size() < 9); + + for (int i = 0; i < hirs.size()-1; i+=2) { + int errCode = ToolRunner.run(merger, + new String[] {this.desc.getNameAsString(), + hirs.get(i).getRegionNameAsString(), + hirs.get(i+1).getRegionNameAsString()} + ); + if (errCode != 0) { + fail("ToolRunner didn't return 0, see logs"); + } + Get get = new Get(hirs.get(i+1).getEndKey()); + Result res = loadedTable.get(get); + assertEquals(1, res.size()); + } + + } + + // Creates about 9 regions + private void fillTable(HTable table,int iteration) throws Exception{ + byte[] k = new byte[7]; + for(int i=0;i hris; + if (region1 == null) { + hris = getListOfMetaRows(meta,tableName); + if (hris.size() < 2) { + throw new IOException("The table doesn't have 2 or more regions"); + } + } else { + hris = new ArrayList(2); + Get get = new Get(region1); + get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + Result res = meta.get(get); + HRegionInfo info1 = Writables.getHRegionInfo((res == null)? null: + res.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); + if (info1== null) { + throw new NullPointerException("info1 is null using key " + + Bytes.toString(region1)); + } + + get = new Get(region2); + get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + Result res2 = meta.get(get); + HRegionInfo info2 = Writables.getHRegionInfo((res2 == null)? null: + res2.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); + if (info2 == null) { + throw new NullPointerException("info2 is null using key "); + } + hris.add(info1); + hris.add(info2); + } + HTable mergedTable = new HTable(conf, tableName); + + for(int i = 0; i < hris.size() - 1; i += 2) { + HRegionInfo info1 = hris.get(i); + HRegionInfo info2 = hris.get(i + 1); + LOG.info("Merging regions " + info1.getRegionNameAsString() + " and " + + Bytes.toString(this.region2) + " in table " + Bytes.toString(this.tableName)); + // Scan the root region for all the meta regions that contain the regions + // we're merging. + Get get = new Get(hris.get(i).getEndKey()); + // This puts the location in our local cache so that when we do a get + // we go directly to the RS + mergedTable.get(get); + + HBaseAdmin admin = new HBaseAdmin(conf); + offlineRegion(info1, meta); + offlineRegion(info2, meta); + admin.closeRegion(info1.getRegionName(),null); + admin.closeRegion(info2.getRegionName(),null); + + LOG.info("Making sure the region1 is down"); + HRegionLocation loc1 = mergedTable.getConnection(). + getRegionLocation(tableName, info1.getEndKey(), false); + while (true) { + try { + mergedTable.getConnection().getHRegionConnection( + loc1.getHostname(), loc1.getPort()).get(info1.getRegionName(), get); + LOG.info("Waiting a bit until it's closed"); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOG.error("Interrupted during the sleep", e); + } + } catch (RemoteException ex) { + IOException ioe = ex.unwrapRemoteException(); + if (ioe instanceof NotServingRegionException || + ioe.getCause() instanceof NotServingRegionException) { + break; + } + } + } + + LOG.info("Making sure the region2 is down"); + HRegionLocation loc2 = mergedTable.getConnection(). + getRegionLocation(tableName, info2.getEndKey(), false); + while (true) { + try { + mergedTable.getConnection().getHRegionConnection( + loc2.getHostname(), loc2.getPort()).get(info2.getRegionName(), get); + LOG.info("Waiting a bit until it's closed"); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOG.error("Interrupted during the sleep", e); + } + } catch (RemoteException ex) { + IOException ioe = ex.unwrapRemoteException(); + if (ioe instanceof NotServingRegionException || + ioe.getCause() instanceof NotServingRegionException) { + break; + } + } + } + + HRegion merged = merge(info1, info2, meta); + + // Now find the meta region which will contain the newly merged region + LOG.info("Adding " + merged.getRegionInfo() + " to " + + ".META."); + + Put put = new Put(merged.getRegionName()); + put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, + Writables.getBytes(merged.getRegionInfo())); + meta.put(put); + merged.close(); + admin.assign(merged.getRegionName(),false); + } + } + + /** + * Get the list of a HRIs in a table + * @param meta HTable to the .META. table + * @return list of hris + * @throws IOException + */ + List getListOfMetaRows(HTable meta,byte [] tableName_local) + throws IOException { + List hris = new ArrayList(); + Scan scan = new Scan(); + ResultScanner resScan = meta.getScanner(scan); + for (Result res : resScan) { + HRegionInfo hri = + Writables.getHRegionInfo(res.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER)); + if (hri.isOffline() || hri.isSplit()) { + LOG.info("Region split or offline " + hri.getRegionNameAsString()); + continue; + } + if (Bytes.equals(hri.getTableName(), tableName_local)) { + hris.add(hri); + LOG.info(hri.getRegionNameAsString()); + } + } + return hris; + } + + // Mark this region offline in META so that we don't reassign it + private void offlineRegion(HRegionInfo r, HTable meta) throws IOException { + r.setOffline(true); + Put p = new Put(r.getRegionName()); + p.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, Writables.getBytes(r)); + meta.put(p); + } + + /* + * Actually merge two regions and update their info in the meta region(s) + * If the meta is split, meta1 may be different from meta2. (and we may have + * to scan the meta if the resulting merged region does not go in either) + * Returns HRegion object for newly merged region + */ + private HRegion merge(HRegionInfo info1, HRegionInfo info2, HTable meta) + throws IOException { + HRegion merged = null; + HLog log = utils.getLog(); + HTable mergedTable = new HTable(conf, tableName); + HTableDescriptor htd = mergedTable.getTableDescriptor(); + HRegion r1 = HRegion.openHRegion(info1, htd, log, this.conf); + try { + HRegion r2 = HRegion.openHRegion(info2, htd, log, this.conf); + try { + merged = HRegion.merge(r1, r2); + } finally { + if (!r2.isClosed()) { + r2.close(); + } + } + } finally { + if (!r1.isClosed()) { + r1.close(); + } + } + + // Remove the old regions from meta. + // HRegion.merge has already deleted their files + + removeRegionFromMeta(meta, info1); + removeRegionFromMeta(meta, info2); + + this.mergeInfo = merged.getRegionInfo(); + return merged; + } + + /* + * Removes a region's meta information from the passed meta + * region. + * + * @param meta META HRegion to be updated + * @param regioninfo HRegionInfo of region to remove from meta + * + * @throws IOException + */ + private void removeRegionFromMeta(HTable meta, HRegionInfo regioninfo) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing region: " + regioninfo + " from " + meta); + } + + Delete delete = new Delete(regioninfo.getRegionName(), + System.currentTimeMillis(), null); + meta.delete(delete); + } + + /* + * Adds a region's meta information from the passed meta + * region. + * + * @param metainfo META HRegionInfo to be updated + * @param region HRegion to add to meta + * + * @throws IOException + */ + private int parseArgs(String[] args) { + GenericOptionsParser parser = + new GenericOptionsParser(this.getConf(), args); + + String[] remainingArgs = parser.getRemainingArgs(); + if (remainingArgs.length > 3 || remainingArgs.length == 0) { + usage(); + return -1; + } + tableName = Bytes.toBytes(remainingArgs[0]); + isMetaTable = Bytes.compareTo(tableName, HConstants.META_TABLE_NAME) == 0; + int status = 0; + + if (remainingArgs.length > 1) { + region1 = Bytes.toBytesBinary(remainingArgs[1]); + region2 = Bytes.toBytesBinary(remainingArgs[2]); + if (notInTable(tableName, region1) || notInTable(tableName, region2)) { + status = -1; + } else if (Bytes.equals(region1, region2)) { + LOG.error("Can't merge a region with itself"); + status = -1; + } + } + return status; + } + + private boolean notInTable(final byte [] tn, final byte [] rn) { + if (WritableComparator.compareBytes(tn, 0, tn.length, rn, 0, tn.length) != 0) { + LOG.error("Region " + Bytes.toString(rn) + " does not belong to table " + + Bytes.toString(tn)); + return true; + } + return false; + } + + private void usage() { + System.err.println( + "Usage: bin/hbase onlinemerge [ ]\n"); + } + + /** + * Main program + * + * @param args + */ + public static void main(String[] args) { + int status = 0; + try { + status = ToolRunner.run(new OnlineMerge(), args); + } catch (Exception e) { + LOG.error("exiting due to error", e); + status = -1; + } + System.exit(status); + } +} \ No newline at end of file Index: bin/hbase =================================================================== --- bin/hbase (revision 1143603) +++ bin/hbase (working copy) @@ -261,6 +261,8 @@ elif [ "$COMMAND" = "classpath" ] ; then echo $CLASSPATH exit 0 +elif [ "$COMMAND" = "onlinemerge" ] ; then + CLASS='org.apache.hadoop.hbase.util.OnlineMerge' elif [ "$COMMAND" = "version" ] ; then CLASS='org.apache.hadoop.hbase.util.VersionInfo' else