From 8d62aa3208dbb791ca979105956af33855d6f4c5 Mon Sep 17 00:00:00 2001 From: eshcar Date: Wed, 30 Dec 2015 16:38:46 +0200 Subject: [PATCH] HBASE-15016 adding RegionStoresProxy --- .../org/apache/hadoop/hbase/util/ClassSize.java | 4 + .../hadoop/hbase/regionserver/DefaultMemStore.java | 28 ++-- .../hbase/regionserver/FlushLargeStoresPolicy.java | 4 +- .../hadoop/hbase/regionserver/HMobStore.java | 7 + .../apache/hadoop/hbase/regionserver/HRegion.java | 141 +++++++++++++------- .../apache/hadoop/hbase/regionserver/HStore.java | 23 ++-- .../apache/hadoop/hbase/regionserver/MemStore.java | 13 +- .../apache/hadoop/hbase/regionserver/Region.java | 12 +- .../hbase/regionserver/RegionStoresProxy.java | 73 ++++++++++ .../apache/hadoop/hbase/regionserver/Store.java | 16 ++- .../org/apache/hadoop/hbase/TestIOFencing.java | 15 ++- .../hadoop/hbase/regionserver/TestHRegion.java | 119 +++++++++-------- 12 files changed, 319 insertions(+), 136 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionStoresProxy.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java index 77acf9b..fdd0fae 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java @@ -110,6 +110,8 @@ public class ClassSize { /** Overhead for CellSkipListSet */ public static final int CELL_SKIPLIST_SET; + public static final int STORE_SERVICES; + /* Are we running on jdk7? */ private static final boolean JDK7; static { @@ -193,6 +195,8 @@ public class ClassSize { TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2); CELL_SKIPLIST_SET = align(OBJECT + REFERENCE); + + STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 89ae0d1..aaa063d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -19,16 +19,6 @@ package org.apache.hadoop.hbase.regionserver; -import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NavigableSet; -import java.util.SortedSet; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -49,6 +39,16 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.htrace.Trace; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicLong; + /** * The MemStore holds in-memory modifications to the Store. Modifications * are {@link Cell}s. When asked to flush, current memstore is moved @@ -948,6 +948,14 @@ public class DefaultMemStore implements MemStore { return heapSize(); } + @Override + public void finalizeFlush() { + } + + @Override public long getMemStoreActiveSize() { + return size(); + } + /** * Code to help figure if our approximation of object heap sizes is close * enough. See hbase-900. Fills memstores then waits so user can heap diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java index b4d47c7..114f4ec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java @@ -57,7 +57,7 @@ public class FlushLargeStoresPolicy extends FlushPolicy { } // For multiple families, lower bound is the "average flush size" by default // unless setting in configuration is larger. - long flushSizeLowerBound = region.getMemstoreFlushSize() / familyNumber; + long flushSizeLowerBound = region.getMemstoreFlushSizeLB() / familyNumber; long minimumLowerBound = getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN, DEFAULT_HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND_MIN); @@ -89,7 +89,7 @@ public class FlushLargeStoresPolicy extends FlushPolicy { } private boolean shouldFlush(Store store) { - if (store.getMemStoreSize() > this.flushSizeLowerBound) { + if (store.getMemStoreActiveSize() > this.flushSizeLowerBound) { if (LOG.isDebugEnabled()) { LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " + region.getRegionInfo().getEncodedName() + " because memstoreSize=" + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index faf6d81..42d22a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -511,6 +511,13 @@ public class HMobStore extends HStore { } } + @Override public void finalizeFlush() { + } + + @Override public long getMemStoreActiveSize() { + return getMemStoreSize(); + } + public void updateCellsCountCompactedToMob(long count) { cellsCountCompactedToMob += count; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9549a13..917324b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -18,6 +18,20 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Closeables; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import com.google.protobuf.TextFormat; + import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -185,20 +199,6 @@ import org.apache.hadoop.util.StringUtils; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Closeables; -import com.google.protobuf.ByteString; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; -import com.google.protobuf.TextFormat; - @InterfaceAudience.Private public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region { private static final Log LOG = LogFactory.getLog(HRegion.class); @@ -273,6 +273,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private Map coprocessorServiceHandlers = Maps.newHashMap(); private final AtomicLong memstoreSize = new AtomicLong(0); + private final RegionStoresProxy regionStoresProxy = new RegionStoresProxy(this); // Debug possible data loss due to WAL off final Counter numMutationsWithoutWAL = new Counter(); @@ -561,7 +562,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final WriteState writestate = new WriteState(); - long memstoreFlushSize; + long memstoreFlushSizeLB; + private long memStoreFlushSizeUB; final long timestampSlop; final long rowProcessorTimeout; @@ -756,10 +758,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE); } - this.memstoreFlushSize = flushSize; - this.blockingMemStoreSize = this.memstoreFlushSize * + this.memstoreFlushSizeLB = flushSize; + this.blockingMemStoreSize = this.memstoreFlushSizeLB * conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); + // set force flush size to be between flush size and blocking size + this.memStoreFlushSizeUB = (this.memstoreFlushSizeLB + this.blockingMemStoreSize) / 2; + } /** @@ -1029,6 +1034,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return false; } + public void blockUpdates() { + this.updatesLock.writeLock().lock(); + } + + public void unblockUpdates() { + this.updatesLock.writeLock().unlock(); + } + @Override public HDFSBlocksDistribution getHDFSBlocksDistribution() { HDFSBlocksDistribution hdfsBlocksDistribution = @@ -1141,6 +1154,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return memstoreSize.get(); } + public RegionStoresProxy getRegionStoresProxy() { + return regionStoresProxy; + } + @Override public long getNumMutationsWithoutWAL() { return numMutationsWithoutWAL.get(); @@ -2510,6 +2527,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // If we get to here, the HStores have been written. + for(Store storeToFlush :storesToFlush) { + storeToFlush.finalizeFlush(); + } if (wal != null) { wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); } @@ -2915,10 +2935,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi initialized = true; } long addedSize = doMiniBatchMutation(batchOp); - long newSize = this.addAndGetGlobalMemstoreSize(addedSize); - if (isFlushSize(newSize)) { - requestFlush(); - } + this.addAndGetGlobalMemstoreSize(addedSize); + requestFlushIfNeeded(); } } finally { closeRegionOperation(op); @@ -3879,6 +3897,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + private void requestFlushIfNeeded() throws RegionTooBusyException { + long memstoreTotalSize = this.getMemstoreSize(); + long memstoreActiveSize = this.getRegionStoresProxy().getGlobalMemstoreActiveSize(); + long memstoreUpperThreshold = this.getMemStoreFlushSizeUB(); + long memstoreLowerThreshold = this.getMemstoreFlushSizeLB(); + + if(memstoreActiveSize > memstoreLowerThreshold || + memstoreTotalSize > memstoreUpperThreshold) { + requestFlush(); + } + } + private void requestFlush() { if (this.rsServices == null) { return; @@ -3901,7 +3931,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return True if size is over the flush threshold */ private boolean isFlushSize(final long size) { - return size > this.memstoreFlushSize; + return size > this.memstoreFlushSizeLB; } /** @@ -5263,7 +5293,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long c = count.decrementAndGet(); if (c <= 0) { synchronized (lock) { - if (count.get() <= 0 ){ + if (count.get() <= 0){ usable.set(false); RowLockContext removed = lockedRows.remove(row); assert removed == this: "we should never remove a different context"; @@ -6068,7 +6098,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected boolean isStopRow(Cell currentRowCell) { return currentRowCell == null - || (stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length) >= isScan); + || (stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, stopRow + .length) >= isScan); } @Override @@ -6806,7 +6837,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } ClientProtos.RegionLoadStats.Builder stats = ClientProtos.RegionLoadStats.newBuilder(); stats.setMemstoreLoad((int) (Math.min(100, (this.memstoreSize.get() * 100) / this - .memstoreFlushSize))); + .memstoreFlushSizeLB))); stats.setHeapOccupancy((int)rsServices.getHeapMemoryManager().getHeapOccupancyPercent()*100); stats.setCompactionPressure((int)rsServices.getCompactionPressure()*100 > 100 ? 100 : (int)rsServices.getCompactionPressure()*100); @@ -6984,9 +7015,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } finally { closeRegionOperation(); - if (!mutations.isEmpty() && - isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) { - requestFlush(); + if (!mutations.isEmpty()) { + this.addAndGetGlobalMemstoreSize(addedSize); + requestFlushIfNeeded(); } } } @@ -7088,7 +7119,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte[] row = mutate.getRow(); checkRow(row, op.toString()); checkFamilies(mutate.getFamilyCellMap().keySet()); - boolean flush = false; Durability durability = getEffectiveDurability(mutate.getDurability()); boolean writeToWAL = durability != Durability.SKIP_WAL; WALEdit walEdits = null; @@ -7268,8 +7298,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi allKVs.addAll(entry.getValue()); } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); + this.addAndGetGlobalMemstoreSize(size); } } finally { this.updatesLock.readLock().unlock(); @@ -7303,10 +7332,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.metricsRegion.updateAppend(); } - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } + // Request a cache flush. Do it outside update lock. + requestFlushIfNeeded(); return mutate.isReturnResults() ? Result.create(allKVs) : null; } @@ -7330,7 +7357,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte [] row = mutation.getRow(); checkRow(row, op.toString()); checkFamilies(mutation.getFamilyCellMap().keySet()); - boolean flush = false; Durability durability = getEffectiveDurability(mutation.getDurability()); boolean writeToWAL = durability != Durability.SKIP_WAL; WALEdit walEdits = null; @@ -7494,8 +7520,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); + this.addAndGetGlobalMemstoreSize(size); } } finally { this.updatesLock.readLock().unlock(); @@ -7528,10 +7553,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } + // Request a cache flush. Do it outside update lock. + requestFlushIfNeeded(); return mutation.isReturnResults() ? Result.create(allKVs) : null; } @@ -7551,8 +7574,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + - (14 * Bytes.SIZEOF_LONG) + + 45 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + + (15 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); // woefully out of date - currently missing: @@ -7576,6 +7599,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi MultiVersionConcurrencyControl.FIXED_SIZE // mvcc + ClassSize.TREEMAP // maxSeqIdInStores + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress + + ClassSize.STORE_SERVICES // store services ; @Override @@ -8184,7 +8208,32 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi : CellComparator.COMPARATOR; } - public long getMemstoreFlushSize() { - return this.memstoreFlushSize; + public long getMemstoreFlushSizeLB() { + return this.memstoreFlushSizeLB; + } + + private long getMemStoreFlushSizeUB() { + return this.memStoreFlushSizeUB; + } + + //// method for debugging tests + public void throwException(String title, String regionName) { + String msg = title + ", "; + msg += getRegionInfo().toString(); + msg += getRegionInfo().isMetaRegion() ? " meta region " : " "; + msg += getRegionInfo().isMetaTable() ? " meta table " : " "; + msg += "stores: "; + for (Store s : getStores()) { + msg += s.getFamily().getNameAsString(); + msg += " size: "; + msg += s.getMemStoreSize(); + msg += " "; + } + msg += "end-of-stores"; + msg += ", memstore size "; + msg += getMemstoreSize(); + if (getRegionInfo().getRegionNameAsString().startsWith(regionName)) { + throw new RuntimeException(msg); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index badbd65..ac2ce2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -43,6 +43,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -94,13 +100,6 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableCollection; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - /** * A Store holds a column family in a Region. Its a memstore and a set of zero * or more StoreFiles, which stretch backwards over time. @@ -349,7 +348,7 @@ public class HStore implements Store { @Override public long getMemstoreFlushSize() { // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack - return this.region.memstoreFlushSize; + return this.region.memstoreFlushSizeLB; } @Override @@ -2317,6 +2316,14 @@ public class HStore implements Store { removeCompactedFiles(copyCompactedfiles); } + @Override public void finalizeFlush() { + memstore.finalizeFlush(); + } + + @Override public long getMemStoreActiveSize() { + return memstore.getMemStoreActiveSize(); + } + private ThreadPoolExecutor getThreadPoolExecutor(int maxThreads) { return Threads.getBoundedCachedThreadPool(maxThreads, maxThreads * 3, TimeUnit.SECONDS, new ThreadFactory() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index e9f8103..94197a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -19,8 +19,8 @@ package org.apache.hadoop.hbase.regionserver; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; /** @@ -134,4 +134,15 @@ public interface MemStore extends HeapSize { * @return Total memory occupied by this MemStore. */ long size(); + + /** + * This method is called when it is clear that the flush to disk is completed. + * The store may do any post-flush actions at this point. + * One example is to update the wal with sequence number that is known only at the store level. + */ + void finalizeFlush(); + /** + * @return the size by which the flush policy decided whether or not to flush the store. + */ + long getMemStoreActiveSize(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 6d87057..bafe117 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -22,6 +22,10 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -49,11 +53,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; - /** * Regions store data for a certain region of a table. It stores all columns * for each row. A given table consists of one or more Regions. @@ -190,6 +189,9 @@ public interface Region extends ConfigurationObserver { /** @return memstore size for this region, in bytes */ long getMemstoreSize(); + /** @return store services for this region, to access services required by store level needs */ + RegionStoresProxy getRegionStoresProxy(); + /** @return the number of mutations processed bypassing the WAL */ long getNumMutationsWithoutWAL(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionStoresProxy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionStoresProxy.java new file mode 100644 index 0000000..63d310e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionStoresProxy.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.wal.WAL; + +/** + * RegionStoresProxy class is the interface through which memstore access services at the region level. + * It also maintains additional data that is updated by memstores and can be queried by the region. + * For example, when using alternative memory formats or due to compaction the memstore needs to + * take occasional lock and update size counters at the region level. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class RegionStoresProxy { + + private final HRegion region; + + // size of fluctuating memstore segments, e.g., in compaction pipeline + private final AtomicLong memstoreFluctuatingSize = new AtomicLong(0); + + public RegionStoresProxy(HRegion region) { + this.region = region; + + } + + public void blockUpdates() { + this.region.blockUpdates(); + } + + public void unblockUpdates() { + this.region.unblockUpdates(); + } + + public long addAndGetGlobalMemstoreSize(long size) { + return this.region.addAndGetGlobalMemstoreSize(size); + } + + public long addAndGetGlobalMemstoreFluctuatingSize(long size) { + return this.memstoreFluctuatingSize.addAndGet(size); + } + + public long getGlobalMemstoreActiveSize() { + return this.region.getMemstoreSize() - memstoreFluctuatingSize.get(); + } + + public long getWalSequenceId() throws IOException { + WAL wal = this.region.getWAL(); + return this.region.getNextSequenceId(wal); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 8bb10f0..ec15ab6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -22,8 +22,6 @@ import java.util.Collection; import java.util.List; import java.util.NavigableSet; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -32,6 +30,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.io.HeapSize; @@ -490,4 +490,16 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * Closes and archives the compacted files under this store */ void closeAndArchiveCompactedFiles() throws IOException; + + /** + * This method is called when it is clear that the flush to disk is completed. + * The store may do any post-flush actions at this point. + * One example is to update the wal with sequence number that is known only at the store level. + */ + void finalizeFlush(); + + /** + * @return the size by which the flush policy decided whether or not to flush the store. + */ + long getMemStoreActiveSize(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index 94a63d8..599f4c1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -17,21 +17,18 @@ */ package org.apache.hadoop.hbase; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; @@ -55,7 +52,8 @@ import org.apache.hadoop.hbase.wal.WAL; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.common.collect.Lists; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Test for the case where a regionserver going down has enough cycles to do damage to regions @@ -207,6 +205,13 @@ public class TestIOFencing { } super.completeCompaction(compactedFiles); } + + @Override public void finalizeFlush() { + } + + @Override public long getMemStoreActiveSize() { + return getMemStoreSize(); + } } private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 35de488..ee8752c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -19,53 +19,10 @@ package org.apache.hadoop.hbase.regionserver; -import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS; -import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR; -import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR; -import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; -import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; -import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; -import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -176,10 +133,52 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS; +import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR; +import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR; +import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Basic stand-alone testing of HRegion. No clusters! @@ -1271,7 +1270,8 @@ public class TestHRegion { private final AtomicInteger count; private Exception e; - GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, final AtomicInteger c) { + GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, final + AtomicInteger c) { super("getter." + i); this.g = new Get(r); this.done = d; @@ -2764,7 +2764,8 @@ public class TestHRegion { } catch (NotServingRegionException e) { // this is the correct exception that is expected } catch (IOException e) { - fail("Got wrong type of exception - should be a NotServingRegionException, but was an IOException: " + fail("Got wrong type of exception - should be a NotServingRegionException, but was an " + + "IOException: " + e.getMessage()); } } finally { @@ -2962,7 +2963,8 @@ public class TestHRegion { } @Test - public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws IOException { + public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws + IOException { byte[] row1 = Bytes.toBytes("row1"); byte[] fam1 = Bytes.toBytes("fam1"); byte[][] families = { fam1 }; @@ -4960,7 +4962,8 @@ public class TestHRegion { // move the file of the primary region to the archive, simulating a compaction Collection storeFiles = primaryRegion.getStore(families[0]).getStorefiles(); primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles); - Collection storeFileInfos = primaryRegion.getRegionFileSystem().getStoreFiles(families[0]); + Collection storeFileInfos = primaryRegion.getRegionFileSystem() + .getStoreFiles(families[0]); Assert.assertTrue(storeFileInfos == null || storeFileInfos.size() == 0); verifyData(secondaryRegion, 0, 1000, cq, families); @@ -4974,7 +4977,8 @@ public class TestHRegion { } } - private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { + private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws + IOException { putData(this.region, startRow, numRows, qf, families); } @@ -5657,7 +5661,8 @@ public class TestHRegion { currRow.clear(); hasNext = scanner.next(currRow); assertEquals(2, currRow.size()); - assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow.get(0).getRowLength(), row4, 0, + assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), + currRow.get(0).getRowLength(), row4, 0, row4.length)); assertTrue(hasNext); // 2. scan out "row3" (2 kv) -- 1.7.10.2 (Apple Git-33)