From a9ca76230362e3fdaf1073175fd18f6924ac9714 Mon Sep 17 00:00:00 2001 From: eshcar Date: Sun, 27 Dec 2015 10:42:15 +0200 Subject: [PATCH] HBASE-15016 Adding StoreServices --- .../org/apache/hadoop/hbase/util/ClassSize.java | 4 + .../hadoop/hbase/regionserver/DefaultMemStore.java | 8 ++ .../hbase/regionserver/FlushLargeStoresPolicy.java | 2 +- .../hadoop/hbase/regionserver/HMobStore.java | 7 ++ .../apache/hadoop/hbase/regionserver/HRegion.java | 101 +++++++++++++---- .../apache/hadoop/hbase/regionserver/HStore.java | 69 +++++++----- .../apache/hadoop/hbase/regionserver/MemStore.java | 17 ++- .../apache/hadoop/hbase/regionserver/Region.java | 20 ++-- .../apache/hadoop/hbase/regionserver/Store.java | 26 +++-- .../hadoop/hbase/regionserver/StoreServices.java | 73 ++++++++++++ .../org/apache/hadoop/hbase/TestIOFencing.java | 25 ++-- .../hadoop/hbase/regionserver/TestHRegion.java | 119 ++++++++++---------- 12 files changed, 328 insertions(+), 143 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreServices.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java index 77acf9b..fdd0fae 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java @@ -110,6 +110,8 @@ public class ClassSize { /** Overhead for CellSkipListSet */ public static final int CELL_SKIPLIST_SET; + public static final int STORE_SERVICES; + /* Are we running on jdk7? */ private static final boolean JDK7; static { @@ -193,6 +195,8 @@ public class ClassSize { TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2); CELL_SKIPLIST_SET = align(OBJECT + REFERENCE); + + STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 89ae0d1..8e39ec1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -948,6 +948,14 @@ public class DefaultMemStore implements MemStore { return heapSize(); } + @Override + public void finalizeFlush() { + } + + @Override public long getMemStoreSizeForFlushPolicy() { + return size(); + } + /** * Code to help figure if our approximation of object heap sizes is close * enough. See hbase-900. Fills memstores then waits so user can heap diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java index b4d47c7..0cc26f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java @@ -89,7 +89,7 @@ public class FlushLargeStoresPolicy extends FlushPolicy { } private boolean shouldFlush(Store store) { - if (store.getMemStoreSize() > this.flushSizeLowerBound) { + if (store.getMemStoreSizeForFlushPolicy() > this.flushSizeLowerBound) { if (LOG.isDebugEnabled()) { LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " + region.getRegionInfo().getEncodedName() + " because memstoreSize=" + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index faf6d81..98e6ee3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -511,6 +511,13 @@ public class HMobStore extends HStore { } } + @Override public void finalizeFlush() { + } + + @Override public long getMemStoreSizeForFlushPolicy() { + return getMemStoreSize(); + } + public void updateCellsCountCompactedToMob(long count) { cellsCountCompactedToMob += count; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9549a13..0e33280 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -273,6 +273,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private Map coprocessorServiceHandlers = Maps.newHashMap(); private final AtomicLong memstoreSize = new AtomicLong(0); + private final StoreServices storeServices = new StoreServices(this); // Debug possible data loss due to WAL off final Counter numMutationsWithoutWAL = new Counter(); @@ -572,6 +573,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final RegionServerServices rsServices; private RegionServerAccounting rsAccounting; private long flushCheckInterval; + // In some cases we want to have a soft flush threshold + private long memStoreSoftFlushSize; // flushPerChanges is to prevent too many changes in memstore private long flushPerChanges; private long blockingMemStoreSize; @@ -760,6 +763,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.blockingMemStoreSize = this.memstoreFlushSize * conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); + // set force flush size to be between flush size and blocking size + this.memStoreSoftFlushSize = (this.memstoreFlushSize + this.blockingMemStoreSize) / 2; + } /** @@ -1029,6 +1035,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return false; } + public void blockUpdates() { + this.updatesLock.writeLock().lock(); + } + + public void unblockUpdates() { + this.updatesLock.writeLock().unlock(); + } + @Override public HDFSBlocksDistribution getHDFSBlocksDistribution() { HDFSBlocksDistribution hdfsBlocksDistribution = @@ -1142,6 +1156,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override + public StoreServices getStoreServices() { + return storeServices; + } + + @Override public long getNumMutationsWithoutWAL() { return numMutationsWithoutWAL.get(); } @@ -2510,6 +2529,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // If we get to here, the HStores have been written. + for(Store storeToFlush :storesToFlush) { + storeToFlush.finalizeFlush(); + } if (wal != null) { wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); } @@ -2915,10 +2937,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi initialized = true; } long addedSize = doMiniBatchMutation(batchOp); - long newSize = this.addAndGetGlobalMemstoreSize(addedSize); - if (isFlushSize(newSize)) { - requestFlush(); - } + this.addAndGetGlobalMemstoreSize(addedSize); + requestFlushIfNeeded(); } } finally { closeRegionOperation(op); @@ -3879,6 +3899,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + private void requestFlushIfNeeded() throws RegionTooBusyException { + long memstoreUpperSize = this.getMemstoreSize(); + long memstoreLowerSize = this.getStoreServices().getMemstoreActiveSize(); + long memstoreUpperThreshold = this.getMemStoreSoftFlushSize(); + long memstoreLowerThreshold = this.getMemstoreFlushSize(); + + if(memstoreLowerSize > memstoreLowerThreshold || + memstoreUpperSize > memstoreUpperThreshold) { + requestFlush(); + } + } + private void requestFlush() { if (this.rsServices == null) { return; @@ -5263,7 +5295,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long c = count.decrementAndGet(); if (c <= 0) { synchronized (lock) { - if (count.get() <= 0 ){ + if (count.get() <= 0){ usable.set(false); RowLockContext removed = lockedRows.remove(row); assert removed == this: "we should never remove a different context"; @@ -6068,7 +6100,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected boolean isStopRow(Cell currentRowCell) { return currentRowCell == null - || (stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length) >= isScan); + || (stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, stopRow + .length) >= isScan); } @Override @@ -6984,9 +7017,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } finally { closeRegionOperation(); - if (!mutations.isEmpty() && - isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) { - requestFlush(); + if (!mutations.isEmpty()) { + this.addAndGetGlobalMemstoreSize(addedSize); + requestFlushIfNeeded(); } } } @@ -7088,7 +7121,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte[] row = mutate.getRow(); checkRow(row, op.toString()); checkFamilies(mutate.getFamilyCellMap().keySet()); - boolean flush = false; Durability durability = getEffectiveDurability(mutate.getDurability()); boolean writeToWAL = durability != Durability.SKIP_WAL; WALEdit walEdits = null; @@ -7268,8 +7300,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi allKVs.addAll(entry.getValue()); } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); + this.addAndGetGlobalMemstoreSize(size); } } finally { this.updatesLock.readLock().unlock(); @@ -7303,10 +7334,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.metricsRegion.updateAppend(); } - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } + // Request a cache flush. Do it outside update lock. + requestFlushIfNeeded(); return mutate.isReturnResults() ? Result.create(allKVs) : null; } @@ -7330,7 +7359,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte [] row = mutation.getRow(); checkRow(row, op.toString()); checkFamilies(mutation.getFamilyCellMap().keySet()); - boolean flush = false; Durability durability = getEffectiveDurability(mutation.getDurability()); boolean writeToWAL = durability != Durability.SKIP_WAL; WALEdit walEdits = null; @@ -7494,8 +7522,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); + this.addAndGetGlobalMemstoreSize(size); } } finally { this.updatesLock.readLock().unlock(); @@ -7528,10 +7555,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } + // Request a cache flush. Do it outside update lock. + requestFlushIfNeeded(); return mutation.isReturnResults() ? Result.create(allKVs) : null; } @@ -7551,8 +7576,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + - (14 * Bytes.SIZEOF_LONG) + + 45 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + + (15 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); // woefully out of date - currently missing: @@ -7576,6 +7601,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi MultiVersionConcurrencyControl.FIXED_SIZE // mvcc + ClassSize.TREEMAP // maxSeqIdInStores + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress + + ClassSize.STORE_SERVICES // store services ; @Override @@ -8187,4 +8213,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public long getMemstoreFlushSize() { return this.memstoreFlushSize; } + + private long getMemStoreSoftFlushSize() { + return this.memStoreSoftFlushSize; + } + + //// method for debugging tests + public void throwException(String title, String regionName) { + String msg = title + ", "; + msg += getRegionInfo().toString(); + msg += getRegionInfo().isMetaRegion() ? " meta region " : " "; + msg += getRegionInfo().isMetaTable() ? " meta table " : " "; + msg += "stores: "; + for (Store s : getStores()) { + msg += s.getFamily().getNameAsString(); + msg += " size: "; + msg += s.getMemStoreSize(); + msg += " "; + } + msg += "end-of-stores"; + msg += ", memstore size "; + msg += getMemstoreSize(); + if (getRegionInfo().getRegionNameAsString().startsWith(regionName)) { + throw new RuntimeException(msg); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index badbd65..97ea43b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -18,31 +18,12 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.NavigableSet; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantReadWriteLock; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -94,12 +75,30 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableCollection; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.InetSocketAddress; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * A Store holds a column family in a Region. Its a memstore and a set of zero @@ -2317,6 +2316,14 @@ public class HStore implements Store { removeCompactedFiles(copyCompactedfiles); } + @Override public void finalizeFlush() { + memstore.finalizeFlush(); + } + + @Override public long getMemStoreSizeForFlushPolicy() { + return memstore.getMemStoreSizeForFlushPolicy(); + } + private ThreadPoolExecutor getThreadPoolExecutor(int maxThreads) { return Threads.getBoundedCachedThreadPool(maxThreads, maxThreads * 3, TimeUnit.SECONDS, new ThreadFactory() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index e9f8103..ac7ecc9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -17,12 +17,12 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.List; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; +import java.util.List; + /** * The MemStore holds in-memory modifications to the Store. Modifications are {@link Cell}s. *

@@ -134,4 +134,15 @@ public interface MemStore extends HeapSize { * @return Total memory occupied by this MemStore. */ long size(); + + /** + * This method is called when it is clear that the flush to disk is completed. + * The store may do any post-flush actions at this point. + * One example is to update the wal with sequence number that is known only at the store level. + */ + void finalizeFlush(); + /** + * @return the size by which the flush policy decided whether or not to flush the store. + */ + long getMemStoreSizeForFlushPolicy(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 6d87057..14046f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -17,11 +17,10 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; - +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -49,10 +48,10 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; /** * Regions store data for a certain region of a table. It stores all columns @@ -190,6 +189,9 @@ public interface Region extends ConfigurationObserver { /** @return memstore size for this region, in bytes */ long getMemstoreSize(); + /** @return store services for this region, to access services required by store level needs */ + StoreServices getStoreServices(); + /** @return the number of mutations processed bypassing the WAL */ long getNumMutationsWithoutWAL(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 8bb10f0..f594920 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -17,13 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.NavigableSet; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -32,6 +25,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.io.HeapSize; @@ -45,6 +40,11 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.security.User; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.NavigableSet; + /** * Interface for objects that hold a column family in a Region. Its a memstore and a set of zero or * more StoreFiles, which stretch backwards over time. @@ -490,4 +490,16 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * Closes and archives the compacted files under this store */ void closeAndArchiveCompactedFiles() throws IOException; + + /** + * This method is called when it is clear that the flush to disk is completed. + * The store may do any post-flush actions at this point. + * One example is to update the wal with sequence number that is known only at the store level. + */ + void finalizeFlush(); + + /** + * @return the size by which the flush policy decided whether or not to flush the store. + */ + long getMemStoreSizeForFlushPolicy(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreServices.java new file mode 100644 index 0000000..43cf7dc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreServices.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.wal.WAL; + +/** + * StoreServices class is the interface through which memstore access services at the region level. + * It also maintains additional data that is updated by memstores and can be queried by the region. + * For example, when using alternative memory formats or due to compaction the memstore needs to + * take occasional lock and update size counters at the region level. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class StoreServices { + + private final HRegion region; + + // size of fluctuating memstore segments, e.g., in compaction pipeline + private final AtomicLong memstoreFluctuatingSize = new AtomicLong(0); + + public StoreServices(HRegion region) { + this.region = region; + + } + + public void blockUpdates() { + this.region.blockUpdates(); + } + + public void unblockUpdates() { + this.region.unblockUpdates(); + } + + public long addAndGetGlobalMemstoreSize(long size) { + return this.region.addAndGetGlobalMemstoreSize(size); + } + + public long addAndGetFluctuatingMemstoreSize(long size) { + return this.memstoreFluctuatingSize.addAndGet(size); + } + + public long getMemstoreActiveSize() { + return this.region.getMemstoreSize() - memstoreFluctuatingSize.get(); + } + + public long getWalSequenceId() throws IOException { + WAL wal = this.region.getWAL(); + return this.region.getNextSequenceId(wal); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index 94a63d8..1612787 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -17,21 +17,13 @@ */ package org.apache.hadoop.hbase; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.CountDownLatch; - +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; @@ -55,7 +47,13 @@ import org.apache.hadoop.hbase.wal.WAL; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Test for the case where a regionserver going down has enough cycles to do damage to regions @@ -207,6 +205,13 @@ public class TestIOFencing { } super.completeCompaction(compactedFiles); } + + @Override public void finalizeFlush() { + } + + @Override public long getMemStoreSizeForFlushPolicy() { + return getMemStoreSize(); + } } private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 35de488..ee8752c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -19,53 +19,10 @@ package org.apache.hadoop.hbase.regionserver; -import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS; -import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR; -import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR; -import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; -import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; -import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; -import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -176,10 +133,52 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS; +import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR; +import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR; +import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Basic stand-alone testing of HRegion. No clusters! @@ -1271,7 +1270,8 @@ public class TestHRegion { private final AtomicInteger count; private Exception e; - GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, final AtomicInteger c) { + GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, final + AtomicInteger c) { super("getter." + i); this.g = new Get(r); this.done = d; @@ -2764,7 +2764,8 @@ public class TestHRegion { } catch (NotServingRegionException e) { // this is the correct exception that is expected } catch (IOException e) { - fail("Got wrong type of exception - should be a NotServingRegionException, but was an IOException: " + fail("Got wrong type of exception - should be a NotServingRegionException, but was an " + + "IOException: " + e.getMessage()); } } finally { @@ -2962,7 +2963,8 @@ public class TestHRegion { } @Test - public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws IOException { + public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws + IOException { byte[] row1 = Bytes.toBytes("row1"); byte[] fam1 = Bytes.toBytes("fam1"); byte[][] families = { fam1 }; @@ -4960,7 +4962,8 @@ public class TestHRegion { // move the file of the primary region to the archive, simulating a compaction Collection storeFiles = primaryRegion.getStore(families[0]).getStorefiles(); primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles); - Collection storeFileInfos = primaryRegion.getRegionFileSystem().getStoreFiles(families[0]); + Collection storeFileInfos = primaryRegion.getRegionFileSystem() + .getStoreFiles(families[0]); Assert.assertTrue(storeFileInfos == null || storeFileInfos.size() == 0); verifyData(secondaryRegion, 0, 1000, cq, families); @@ -4974,7 +4977,8 @@ public class TestHRegion { } } - private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { + private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws + IOException { putData(this.region, startRow, numRows, qf, families); } @@ -5657,7 +5661,8 @@ public class TestHRegion { currRow.clear(); hasNext = scanner.next(currRow); assertEquals(2, currRow.size()); - assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow.get(0).getRowLength(), row4, 0, + assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), + currRow.get(0).getRowLength(), row4, 0, row4.length)); assertTrue(hasNext); // 2. scan out "row3" (2 kv) -- 1.7.10.2 (Apple Git-33)