diff --git src/main/java/org/apache/hadoop/hbase/HServerLoad.java src/main/java/org/apache/hadoop/hbase/HServerLoad.java index 6c1d33d..42deb5f 100644 --- src/main/java/org/apache/hadoop/hbase/HServerLoad.java +++ src/main/java/org/apache/hadoop/hbase/HServerLoad.java @@ -89,6 +89,10 @@ implements WritableComparable { private int readRequestsCount; /** the current total write requests made to region */ private int writeRequestsCount; + /** the total compacting key values in currently running compaction */ + private long totalCompactingKVs; + /** the completed count of key values in currently running compaction */ + private long currentCompactedKVs; /** * Constructor, for Writable @@ -101,17 +105,21 @@ implements WritableComparable { * @param name * @param stores * @param storefiles + * @param storeUncompressedSizeMB * @param storefileSizeMB * @param memstoreSizeMB * @param storefileIndexSizeMB * @param readRequestsCount * @param writeRequestsCount + * @param totalCompactingKVs + * @param currentCompactedKVs */ public RegionLoad(final byte[] name, final int stores, final int storefiles, final int storeUncompressedSizeMB, final int storefileSizeMB, final int memstoreSizeMB, final int storefileIndexSizeMB, - final int readRequestsCount, final int writeRequestsCount) { + final int readRequestsCount, final int writeRequestsCount, + final long totalCompactingKVs, final long currentCompactedKVs) { this.name = name; this.stores = stores; this.storefiles = storefiles; @@ -121,6 +129,8 @@ implements WritableComparable { this.storefileIndexSizeMB = storefileIndexSizeMB; this.readRequestsCount = readRequestsCount; this.writeRequestsCount = writeRequestsCount; + this.totalCompactingKVs = totalCompactingKVs; + this.currentCompactedKVs = currentCompactedKVs; } // Getters @@ -173,7 +183,7 @@ implements WritableComparable { public int getStorefileIndexSizeMB() { return storefileIndexSizeMB; } - + /** * @return the number of requests made to region */ @@ -195,6 +205,20 @@ implements WritableComparable { return writeRequestsCount; } + /** + * @return the total number of kvs in current compaction + */ + public long getTotalCompactingKVs() { + return totalCompactingKVs; + } + + /** + * @return the number of already compacted kvs in current compaction + */ + public long getCurrentCompactedKVs() { + return currentCompactedKVs; + } + // Setters /** @@ -247,6 +271,21 @@ implements WritableComparable { this.writeRequestsCount = requestsCount; } + /** + * @param totalCompactingKVs the number of kvs total in current compaction + */ + public void setTotalCompactingKVs(int totalCompactingKVs) { + this.totalCompactingKVs = totalCompactingKVs; + } + + /** + * @param currentCompactedKVs the number of kvs already compacted in + * current compaction + */ + public void setCurrentCompactedKVs(int currentCompactedKVs) { + this.currentCompactedKVs = currentCompactedKVs; + } + // Writable public void readFields(DataInput in) throws IOException { super.readFields(in); @@ -263,6 +302,8 @@ implements WritableComparable { this.storefileIndexSizeMB = in.readInt(); this.readRequestsCount = in.readInt(); this.writeRequestsCount = in.readInt(); + this.totalCompactingKVs = in.readInt(); + this.currentCompactedKVs = in.readInt(); } public void write(DataOutput out) throws IOException { @@ -278,6 +319,8 @@ implements WritableComparable { out.writeInt(storefileIndexSizeMB); out.writeInt(readRequestsCount); out.writeInt(writeRequestsCount); + out.writeLong(totalCompactingKVs); + out.writeLong(currentCompactedKVs); } /** @@ -296,7 +339,7 @@ implements WritableComparable { if (this.storeUncompressedSizeMB != 0) { sb = Strings.appendKeyValue(sb, "compressionRatio", String.format("%.4f", (float)this.storefileSizeMB/ - (float)this.storeUncompressedSizeMB)); + (float)this.storeUncompressedSizeMB)); } sb = Strings.appendKeyValue(sb, "memstoreSizeMB", Integer.valueOf(this.memstoreSizeMB)); @@ -306,6 +349,17 @@ implements WritableComparable { Long.valueOf(this.readRequestsCount)); sb = Strings.appendKeyValue(sb, "writeRequestsCount", Long.valueOf(this.writeRequestsCount)); + sb = Strings.appendKeyValue(sb, "totalCompactingKVs", + Long.valueOf(this.totalCompactingKVs)); + sb = Strings.appendKeyValue(sb, "currentCompactedKVs", + Long.valueOf(this.currentCompactedKVs)); + float compactionProgressPct = Float.NaN; + if( this.totalCompactingKVs > 0 ) { + compactionProgressPct = Float.valueOf( + this.currentCompactedKVs / this.totalCompactingKVs); + } + sb = Strings.appendKeyValue(sb, "compactionProgressPct", + compactionProgressPct); return sb.toString(); } } diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 977115d..23f1cfa 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -109,6 +109,7 @@ import org.apache.hadoop.hbase.ipc.Invocation; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler; @@ -628,9 +629,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, closeUserRegions(this.abortRequested); } else if (this.stopping) { LOG.info("Stopping meta regions, if the HRegionServer hosts any"); - + boolean allUserRegionsOffline = areAllUserRegionsOffline(); - + if (allUserRegionsOffline) { // Set stopped if no requests since last time we went around the loop. // The remaining meta regions will be closed on our way out. @@ -920,6 +921,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, int storefileSizeMB = 0; int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024); int storefileIndexSizeMB = 0; + long totalCompactingKVs = 0; + long currentCompactedKVs = 0; synchronized (r.stores) { stores += r.stores.size(); for (Store store : r.stores.values()) { @@ -928,12 +931,18 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, / 1024 / 1024); storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024); storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024); + CompactionProgress progress = store.getCompactionProgress(); + if( progress != null ) { + totalCompactingKVs += progress.totalCompactingKVs; + currentCompactedKVs += progress.currentCompactedKVs; + } } } return new HServerLoad.RegionLoad(name,stores, storefiles, storeUncompressedSizeMB, storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB, - (int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get()); + (int) r.readRequestsCount.get(), (int) r.writeRequestsCount.get(), + totalCompactingKVs, currentCompactedKVs); } /** @@ -1057,13 +1066,13 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, super("CompactionChecker", sleepTime, h); this.instance = h; LOG.info("Runs every " + StringUtils.formatTime(sleepTime)); - + /* MajorCompactPriority is configurable. * If not set, the compaction will use default priority. */ this.majorCompactPriority = this.instance.conf. getInt("hbase.regionserver.compactionChecker.majorCompactPriority", - DEFAULT_PRIORITY); + DEFAULT_PRIORITY); } @Override @@ -1078,14 +1087,14 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests compaction"); } else if (s.isMajorCompaction()) { - if (majorCompactPriority == DEFAULT_PRIORITY || + if (majorCompactPriority == DEFAULT_PRIORITY || majorCompactPriority > r.getCompactPriority()) { this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use default priority"); } else { this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use configured priority", - this.majorCompactPriority); + this.majorCompactPriority); } } } catch (IOException e) { @@ -1321,7 +1330,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, while (true) { try { this.infoServer = new InfoServer("regionserver", addr, port, false, this.conf); - this.infoServer.addServlet("status", "/rs-status", RSStatusServlet.class); + this.infoServer.addServlet("status", "/rs-status", RSStatusServlet.class); this.infoServer.setAttribute(REGIONSERVER, this); this.infoServer.start(); break; @@ -1812,7 +1821,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, + "regionName is null"); } HRegion region = getRegion(regionName); - Integer lock = getLockFromId(put.getLockId()); + Integer lock = getLockFromId(put.getLockId()); if (region.getCoprocessorHost() != null) { Boolean result = region.getCoprocessorHost() .preCheckAndPut(row, family, qualifier, compareOp, comparator, put); @@ -1851,7 +1860,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, + "regionName is null"); } HRegion region = getRegion(regionName); - Integer lock = getLockFromId(delete.getLockId()); + Integer lock = getLockFromId(delete.getLockId()); WritableByteArrayComparable comparator = new BinaryComparator(value); if (region.getCoprocessorHost() != null) { Boolean result = region.getCoprocessorHost().preCheckAndDelete(row, @@ -1892,7 +1901,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, + "regionName is null"); } HRegion region = getRegion(regionName); - Integer lock = getLockFromId(delete.getLockId()); + Integer lock = getLockFromId(delete.getLockId()); if (region.getCoprocessorHost() != null) { Boolean result = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, compareOp, comparator, delete); diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 97385c8..b41b961 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -24,10 +24,8 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.NavigableSet; -import java.util.Set; import java.util.SortedSet; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -39,27 +37,29 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; /** * A Store holds a column family in a Region. Its a memstore and a set of zero @@ -115,6 +115,7 @@ public class Store implements HeapSize { final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final String storeNameStr; private final boolean inMemory; + private CompactionProgress progress; /* * List of store files inside this store. This is an immutable list that @@ -755,7 +756,7 @@ public class Store implements HeapSize { * @param dir * @throws IOException */ - public static long getLowestTimestamp(final List candidates) + public static long getLowestTimestamp(final List candidates) throws IOException { long minTs = Long.MAX_VALUE; for (StoreFile storeFile : candidates) { @@ -764,6 +765,13 @@ public class Store implements HeapSize { return minTs; } + /** getter for CompactionProgress object + * @return CompactionProgress object + */ + public CompactionProgress getCompactionProgress() { + return this.progress; + } + /* * @return True if we should run a major compaction. */ @@ -819,7 +827,7 @@ public class Store implements HeapSize { } } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) { LOG.debug("Major compaction triggered on store " + this.storeNameStr + - ", because keyvalues outdated; time since last major compaction " + + ", because keyvalues outdated; time since last major compaction " + (now - lowTimestamp) + "ms"); result = true; } @@ -1079,6 +1087,9 @@ public class Store implements HeapSize { } } + // keep track of compaction progress + progress = new CompactionProgress(maxKeyCount); + // For each file, obtain a scanner: List scanners = StoreFileScanner .getScannersForStoreFiles(filesToCompact, false, false); @@ -1106,7 +1117,8 @@ public class Store implements HeapSize { // output to writer: for (KeyValue kv : kvs) { writer.append(kv); - + // update progress per key + ++progress.currentCompactedKVs; // check periodically to see if a system stop is requested if (Store.closeCheckInterval > 0) { bytesWritten += kv.getLength(); diff --git src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java new file mode 100644 index 0000000..9bc66e1 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java @@ -0,0 +1,53 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.compactions; + +/** + * This class holds information relevant for tracking the progress of a + * compaction. + * + *

The metrics tracked allow one to calculate the percent completion of the + * compaction based on the number of Key/Value pairs already compacted vs. + * total amount scheduled to be compacted. + * + */ +public class CompactionProgress { + + /** the total compacting key values in currently running compaction */ + public long totalCompactingKVs; + /** the completed count of key values in currently running compaction */ + public long currentCompactedKVs = 0; + + /** Constructor + * @param totalCompactingKVs the total Key/Value pairs to be compacted + */ + public CompactionProgress(long totalCompactingKVs) { + this.totalCompactingKVs = totalCompactingKVs; + } + + /** getter for calculated percent complete + * @return float + */ + public float getProgressPct() { + return currentCompactedKVs / totalCompactingKVs; + } + +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java index a507f12..6ed24eb 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java @@ -77,7 +77,7 @@ public class CompactionRequest implements Comparable, /** * This function will define where in the priority queue the request will * end up. Those with the highest priorities will be first. When the - * priorities are the same it will It will first compare priority then date + * priorities are the same it will first compare priority then date * to maintain a FIFO functionality. * *

Note: The date is only accurate to the millisecond which means it is diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 04a2d13..27ed6bf 100644 --- src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -19,9 +19,15 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + import java.io.IOException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,20 +37,18 @@ import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.HFileScanner; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; +import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hdfs.MiniDFSCluster; - import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.spy; /** @@ -161,9 +165,26 @@ public class TestCompaction extends HBaseTestCase { Result result = r.get(new Get(STARTROW).addFamily(COLUMN_FAMILY_TEXT).setMaxVersions(100), null); assertEquals(compactionThreshold, result.size()); + // see if CompactionProgress is in place but null + for (Store store: this.r.stores.values()) { + assertNull(store.getCompactionProgress()); + } + r.flushcache(); r.compactStores(true); + // see if CompactionProgress has done its thing on at least one store + int storeCount = 0; + for (Store store: this.r.stores.values()) { + CompactionProgress progress = store.getCompactionProgress(); + if( progress != null ) { + ++storeCount; + assert(progress.currentCompactedKVs > 0); + assert(progress.totalCompactingKVs > 0); + } + assert(storeCount > 0); + } + // look at the second row // Increment the least significant character so we get to next row. byte [] secondRowBytes = START_KEY.getBytes(HConstants.UTF8_ENCODING);