Index: src/main/java/org/apache/hadoop/hbase/HServerLoad.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HServerLoad.java (revision 1069743) +++ src/main/java/org/apache/hadoop/hbase/HServerLoad.java (working copy) @@ -65,6 +65,8 @@ private int memstoreSizeMB; /** the current total size of storefile indexes for the region, in MB */ private int storefileIndexSizeMB; + /** the current total request made to region */ + private long requestsCount; /** * Constructor, for Writable @@ -80,16 +82,18 @@ * @param storefileSizeMB * @param memstoreSizeMB * @param storefileIndexSizeMB + * @param requestsCount */ public RegionLoad(final byte[] name, final int stores, final int storefiles, final int storefileSizeMB, - final int memstoreSizeMB, final int storefileIndexSizeMB) { + final int memstoreSizeMB, final int storefileIndexSizeMB,final long requestsCount) { this.name = name; this.stores = stores; this.storefiles = storefiles; this.storefileSizeMB = storefileSizeMB; this.memstoreSizeMB = memstoreSizeMB; this.storefileIndexSizeMB = storefileIndexSizeMB; + this.requestsCount = requestsCount; } // Getters @@ -142,6 +146,13 @@ public int getStorefileIndexSizeMB() { return storefileIndexSizeMB; } + + /** + * @return the number of requests made to region + */ + public long getRequestsCount() { + return requestsCount; + } // Setters @@ -181,6 +192,13 @@ this.storefileIndexSizeMB = storefileIndexSizeMB; } + /** + * @param requestsCount the number of requests to region + */ + public void setRequestsCount(long requestsCount) { + this.requestsCount = requestsCount; + } + // Writable public void readFields(DataInput in) throws IOException { int namelen = in.readInt(); @@ -191,6 +209,7 @@ this.storefileSizeMB = in.readInt(); this.memstoreSizeMB = in.readInt(); this.storefileIndexSizeMB = in.readInt(); + this.requestsCount = in.readLong(); } public void write(DataOutput out) throws IOException { @@ -201,6 +220,7 @@ out.writeInt(storefileSizeMB); out.writeInt(memstoreSizeMB); out.writeInt(storefileIndexSizeMB); + out.writeLong(requestsCount); } /** @@ -218,6 +238,8 @@ Integer.valueOf(this.memstoreSizeMB)); sb = Strings.appendKeyValue(sb, "storefileIndexSizeMB", Integer.valueOf(this.storefileIndexSizeMB)); + sb = Strings.appendKeyValue(sb, "requestsCount", + Long.valueOf(this.requestsCount)); return sb.toString(); } } @@ -452,14 +474,16 @@ * @param storefiles * @param memstoreSizeMB * @param storefileIndexSizeMB + * @param requestsCount * @deprecated Use {@link #addRegionInfo(RegionLoad)} */ @Deprecated public void addRegionInfo(final byte[] name, final int stores, final int storefiles, final int storefileSizeMB, - final int memstoreSizeMB, final int storefileIndexSizeMB) { + final int memstoreSizeMB, final int storefileIndexSizeMB, + final long requestsCount) { this.regionLoad.add(new HServerLoad.RegionLoad(name, stores, storefiles, - storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB)); + storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, requestsCount)); } // Writable Index: src/main/java/org/apache/hadoop/hbase/util/OnlineMerge.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/OnlineMerge.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/util/OnlineMerge.java (revision 0) @@ -0,0 +1,400 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.ipc.RemoteException; + +/** + * Utility that can merge any two regions in the same table: adjacent, + * overlapping or disjoint. It can also merge every regions, two by two. + */ +public class OnlineMerge extends Configured implements Tool { + static final Log LOG = LogFactory.getLog(Merge.class); + private final Configuration conf; + private volatile MetaUtils utils; + private byte [] tableName; // Name of table + private volatile byte [] region1; // Name of region 1 + private volatile byte [] region2; // Name of region 2 + private volatile boolean isMetaTable; + private volatile HRegionInfo mergeInfo; + + /** default constructor */ + public OnlineMerge() { + this(HBaseConfiguration.create()); + } + + /** + * @param conf + */ + public OnlineMerge(Configuration conf) { + super(conf); + this.conf = conf; + this.conf.setInt("hbase.client.retries.number", 3); + this.conf.setInt("hbase.client.pause", 1000); + this.mergeInfo = null; + } + + public int run(String[] args) throws Exception { + if (parseArgs(args) != 0) { + return -1; + } + + // Verify file system is up. + FileSystem fs = FileSystem.get(this.conf); // get DFS handle + LOG.info("Verifying that file system is available..."); + try { + FSUtils.checkFileSystemAvailable(fs); + } catch (IOException e) { + LOG.fatal("File system is not available", e); + return -1; + } + + // Verify HBase is up + LOG.info("Verifying that HBase is running..."); + try { + HBaseAdmin.checkHBaseAvailable(conf); + } catch (MasterNotRunningException e) { + LOG.fatal("HBase cluster must be on-line."); + return -1; + } + + // Initialize MetaUtils and and get the root of the HBase installation + + this.utils = new MetaUtils(conf); + try { + if (isMetaTable) { + throw new Exception ("Can't merge meta tables online"); + } else { + mergeRegions(); + } + return 0; + } catch (Exception e) { + LOG.fatal("Merge failed", e); + return -1; + + } finally { + if (this.utils != null) { + this.utils.shutdown(); + } + } + } + + /* + * Merges two regions from a user table. + */ + private void mergeRegions() throws IOException { + HTable meta = new HTable(HConstants.META_TABLE_NAME); + List hris; + if (region1 == null) { + hris = getListOfMetaRows(meta,tableName); + if (hris.size() < 2) { + throw new IOException("The table doesn't have 2 or more regions"); + } + } else { + hris = new ArrayList(2); + Get get = new Get(region1); + get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + Result res = meta.get(get); + HRegionInfo info1 = Writables.getHRegionInfo((res == null)? null: + res.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); + if (info1== null) { + throw new NullPointerException("info1 is null using key " + + Bytes.toString(region1)); + } + + get = new Get(region2); + get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER); + Result res2 = meta.get(get); + HRegionInfo info2 = Writables.getHRegionInfo((res2 == null)? null: + res2.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); + if (info2 == null) { + throw new NullPointerException("info2 is null using key "); + } + hris.add(info1); + hris.add(info2); + } + HTable mergedTable = new HTable(tableName); + + for(int i = 0; i < hris.size() - 1; i += 2) { + HRegionInfo info1 = hris.get(i); + HRegionInfo info2 = hris.get(i + 1); + LOG.info("Merging regions " + info1.getRegionNameAsString() + " and " + + Bytes.toString(this.region2) + " in table " + Bytes.toString(this.tableName)); + // Scan the root region for all the meta regions that contain the regions + // we're merging. + Get get = new Get(hris.get(i).getEndKey()); + // This puts the location in our local cache so that when we do a get + // we go directly to the RS + mergedTable.get(get); + + HBaseAdmin admin = new HBaseAdmin(conf); + offlineRegion(info1, meta); + offlineRegion(info2, meta); + admin.closeRegion(info1.getRegionName(),null); + admin.closeRegion(info2.getRegionName(),null); + + LOG.info("Making sure the region1 is down"); + HRegionLocation loc1 = mergedTable.getConnection(). + getRegionLocation(tableName, info1.getEndKey(), false); + while (true) { + try { + mergedTable.getConnection().getHRegionConnection( + loc1.getServerAddress()).get(info1.getRegionName(), get); + LOG.info("Waiting a bit until it's closed"); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOG.error("Interrupted during the sleep", e); + } + } catch (RemoteException ex) { + IOException ioe = RemoteExceptionHandler.decodeRemoteException(ex); + if (ioe instanceof NotServingRegionException || + ioe.getCause() instanceof NotServingRegionException) { + break; + } + } + } + + LOG.info("Making sure the region2 is down"); + HRegionLocation loc2 = mergedTable.getConnection(). + getRegionLocation(tableName, info2.getEndKey(), false); + while (true) { + try { + mergedTable.getConnection().getHRegionConnection( + loc2.getServerAddress()).get(info2.getRegionName(), get); + LOG.info("Waiting a bit until it's closed"); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + LOG.error("Interrupted during the sleep", e); + } + } catch (RemoteException ex) { + IOException ioe = RemoteExceptionHandler.decodeRemoteException(ex); + if (ioe instanceof NotServingRegionException || + ioe.getCause() instanceof NotServingRegionException) { + break; + } + } + } + + HRegion merged = merge(info1, info2, meta); + + // Now find the meta region which will contain the newly merged region + LOG.info("Adding " + merged.getRegionInfo() + " to " + + ".META."); + + Put put = new Put(merged.getRegionName()); + put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, + Writables.getBytes(merged.getRegionInfo())); + meta.put(put); + merged.close(); + admin.assign(merged.getRegionName(),false); + } + } + + /** + * Get the list of a HRIs in a table + * @param meta HTable to the .META. table + * @return list of hris + * @throws IOException + */ + List getListOfMetaRows(HTable meta,byte [] tableName_local) + throws IOException { + List hris = new ArrayList(); + Scan scan = new Scan(); + ResultScanner resScan = meta.getScanner(scan); + for (Result res : resScan) { + HRegionInfo hri = + Writables.getHRegionInfo(res.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER)); + if (hri.isOffline() || hri.isSplit()) { + LOG.info("Region split or offline " + hri.getRegionNameAsString()); + continue; + } + if (Bytes.equals(hri.getTableDesc().getName(), tableName_local)) { + hris.add(hri); + LOG.info(hri.getRegionNameAsString()); + } + } + return hris; + } + + // Mark this region offline in META so that we don't reassign it + private void offlineRegion(HRegionInfo r, HTable meta) throws IOException { + r.setOffline(true); + Put p = new Put(r.getRegionName()); + p.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, Writables.getBytes(r)); + meta.put(p); + } + + /* + * Actually merge two regions and update their info in the meta region(s) + * If the meta is split, meta1 may be different from meta2. (and we may have + * to scan the meta if the resulting merged region does not go in either) + * Returns HRegion object for newly merged region + */ + private HRegion merge(HRegionInfo info1, HRegionInfo info2, HTable meta) + throws IOException { + HRegion merged = null; + HLog log = utils.getLog(); + HRegion r1 = HRegion.openHRegion(info1, log, this.conf); + try { + HRegion r2 = HRegion.openHRegion(info2, log, this.conf); + try { + merged = HRegion.merge(r1, r2); + } finally { + if (!r2.isClosed()) { + r2.close(); + } + } + } finally { + if (!r1.isClosed()) { + r1.close(); + } + } + + // Remove the old regions from meta. + // HRegion.merge has already deleted their files + + removeRegionFromMeta(meta, info1); + removeRegionFromMeta(meta, info2); + + this.mergeInfo = merged.getRegionInfo(); + return merged; + } + + /* + * Removes a region's meta information from the passed meta + * region. + * + * @param meta META HRegion to be updated + * @param regioninfo HRegionInfo of region to remove from meta + * + * @throws IOException + */ + private void removeRegionFromMeta(HTable meta, HRegionInfo regioninfo) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing region: " + regioninfo + " from " + meta); + } + + Delete delete = new Delete(regioninfo.getRegionName(), + System.currentTimeMillis(), null); + meta.delete(delete); + } + + /* + * Adds a region's meta information from the passed meta + * region. + * + * @param metainfo META HRegionInfo to be updated + * @param region HRegion to add to meta + * + * @throws IOException + */ + private int parseArgs(String[] args) { + GenericOptionsParser parser = + new GenericOptionsParser(this.getConf(), args); + + String[] remainingArgs = parser.getRemainingArgs(); + if (remainingArgs.length > 3 || remainingArgs.length == 0) { + usage(); + return -1; + } + tableName = Bytes.toBytes(remainingArgs[0]); + isMetaTable = Bytes.compareTo(tableName, HConstants.META_TABLE_NAME) == 0; + int status = 0; + + if (remainingArgs.length > 1) { + region1 = Bytes.toBytesBinary(remainingArgs[1]); + region2 = Bytes.toBytesBinary(remainingArgs[2]); + if (notInTable(tableName, region1) || notInTable(tableName, region2)) { + status = -1; + } else if (Bytes.equals(region1, region2)) { + LOG.error("Can't merge a region with itself"); + status = -1; + } + } + return status; + } + + private boolean notInTable(final byte [] tn, final byte [] rn) { + if (WritableComparator.compareBytes(tn, 0, tn.length, rn, 0, tn.length) != 0) { + LOG.error("Region " + Bytes.toString(rn) + " does not belong to table " + + Bytes.toString(tn)); + return true; + } + return false; + } + + private void usage() { + System.err.println( + "Usage: bin/hbase onlinemerge [ ]\n"); + } + + /** + * Main program + * + * @param args + */ + public static void main(String[] args) { + int status = 0; + try { + status = ToolRunner.run(new OnlineMerge(), args); + } catch (Exception e) { + LOG.error("exiting due to error", e); + status = -1; + } + System.exit(status); + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1069743) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -43,6 +43,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -173,7 +174,7 @@ // private byte [] name = null; final AtomicLong memstoreSize = new AtomicLong(0); - + final AtomicLong requestsCount = new AtomicLong(0); /** * The directory for the table this region is part of. * This directory contains the directory for this region. @@ -3370,7 +3371,7 @@ } public static final long FIXED_OVERHEAD = ClassSize.align( - (4 * Bytes.SIZEOF_LONG) + ClassSize.ARRAY + + (5 * Bytes.SIZEOF_LONG) + ClassSize.ARRAY + (24 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + @@ -3634,6 +3635,7 @@ throw new NotServingRegionException(regionInfo.getRegionNameAsString() + " is closed"); } + this.requestsCount.incrementAndGet(); } /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java (revision 1069743) +++ src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java (working copy) @@ -128,6 +128,11 @@ public final MetricsIntValue storefiles = new MetricsIntValue("storefiles", registry); /** + * Count of storefiles open on the regionserver. + */ + public final MetricsLongValue requestsCount = new MetricsLongValue("requestsCount", registry); + + /** * Sum of all the storefile index sizes in this regionserver in MB */ public final MetricsIntValue storefileIndexSizeMB = @@ -243,6 +248,7 @@ this.storefiles.pushMetric(this.metricsRecord); this.storefileIndexSizeMB.pushMetric(this.metricsRecord); this.memstoreSizeMB.pushMetric(this.metricsRecord); + this.requestsCount.pushMetric(this.metricsRecord); this.regions.pushMetric(this.metricsRecord); this.requests.pushMetric(this.metricsRecord); this.compactionQueueSize.pushMetric(this.metricsRecord); @@ -345,6 +351,8 @@ Integer.valueOf(this.storefileIndexSizeMB.get())); sb = Strings.appendKeyValue(sb, "memstoreSize", Integer.valueOf(this.memstoreSizeMB.get())); + sb = Strings.appendKeyValue(sb, "requestsCount", + Long.valueOf(this.requestsCount.get())); sb = Strings.appendKeyValue(sb, "compactionQueueSize", Integer.valueOf(this.compactionQueueSize.get())); sb = Strings.appendKeyValue(sb, "flushQueueSize", Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1069743) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -906,6 +906,7 @@ int storefileSizeMB = 0; int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024); int storefileIndexSizeMB = 0; + long requestsCount = r.requestsCount.get(); synchronized (r.stores) { stores += r.stores.size(); for (Store store : r.stores.values()) { @@ -914,8 +915,8 @@ storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024); } } - return new HServerLoad.RegionLoad(name, stores, storefiles, - storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB); + return new HServerLoad.RegionLoad(name,stores, storefiles, + storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, requestsCount); } /** @@ -1154,11 +1155,13 @@ int stores = 0; int storefiles = 0; long memstoreSize = 0; + long requestsCount = 0; long storefileIndexSize = 0; synchronized (this.onlineRegions) { for (Map.Entry e : this.onlineRegions.entrySet()) { HRegion r = e.getValue(); memstoreSize += r.memstoreSize.get(); + requestsCount+= r.requestsCount.get(); synchronized (r.stores) { stores += r.stores.size(); for (Map.Entry ee : r.stores.entrySet()) { @@ -1172,6 +1175,7 @@ this.metrics.stores.set(stores); this.metrics.storefiles.set(storefiles); this.metrics.memstoreSizeMB.set((int) (memstoreSize / (1024 * 1024))); + this.metrics.requestsCount.set(requestsCount); this.metrics.storefileIndexSizeMB .set((int) (storefileIndexSize / (1024 * 1024))); this.metrics.compactionQueueSize.set(compactSplitThread