From cc164b3f6b975ab065932be8e0ca7a908e76a95d Mon Sep 17 00:00:00 2001 From: stack Date: Wed, 6 Jan 2016 13:07:35 -0800 Subject: [PATCH] Suggested patch. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Just adds the new high flush threshold without disturbing or changing meaning of old settings/configurations. Removes all notion of fluctuating and active from Region space. These concepts are internal affairs of Store and leak out of the incoming new in-memory flushing Store. The only 'size' externalized by the Store is its memstore size, as it used to. Notion is that the Store would manage how this size is done keeping account of memstore content and outstanding segments and snapshot. The region now does a flush if we hit the old flush threshold and will flush again when we hit the new high threshold. But this doesn't see right, or it does not seem enough unless I am missing something -- not in this patch nor in the patch originally attached to this issue. Don't we need to distingush the flush types? Otherwise the high threshold gets ignored because low threshold has already been queued. Minor stuff is that we pass in the size when we do requestFlushIfNeeded rather than go fetch it again from the atomic (minor perf concernt). Changed name of the RegionServiceProxy to be an Interface named RegionServicesForStore. I don't actually use it in this patch because it is incomplete. Was going to just have Region implement it for now. --- .../org/apache/hadoop/hbase/util/ClassSize.java | 4 + .../hadoop/hbase/regionserver/DefaultMemStore.java | 24 ++-- .../hadoop/hbase/regionserver/HMobStore.java | 3 + .../apache/hadoop/hbase/regionserver/HRegion.java | 144 +++++++++++++-------- .../apache/hadoop/hbase/regionserver/HStore.java | 30 +++-- .../apache/hadoop/hbase/regionserver/MemStore.java | 9 +- .../apache/hadoop/hbase/regionserver/Region.java | 9 +- .../regionserver/RegionServicesForStores.java | 38 ++++++ .../apache/hadoop/hbase/regionserver/Store.java | 13 +- .../hbase/regionserver/StoreConfigInformation.java | 2 + .../org/apache/hadoop/hbase/TestIOFencing.java | 4 +- .../hadoop/hbase/regionserver/TestHRegion.java | 119 +++++++++-------- 12 files changed, 257 insertions(+), 142 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java index 77acf9b..fdd0fae 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ClassSize.java @@ -110,6 +110,8 @@ public class ClassSize { /** Overhead for CellSkipListSet */ public static final int CELL_SKIPLIST_SET; + public static final int STORE_SERVICES; + /* Are we running on jdk7? */ private static final boolean JDK7; static { @@ -193,6 +195,8 @@ public class ClassSize { TIMERANGE_TRACKER = align(ClassSize.OBJECT + Bytes.SIZEOF_LONG * 2); CELL_SKIPLIST_SET = align(OBJECT + REFERENCE); + + STORE_SERVICES = align(OBJECT + REFERENCE + ATOMIC_LONG); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 89ae0d1..cec06d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -19,16 +19,6 @@ package org.apache.hadoop.hbase.regionserver; -import java.lang.management.ManagementFactory; -import java.lang.management.RuntimeMXBean; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NavigableSet; -import java.util.SortedSet; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -49,6 +39,16 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.htrace.Trace; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicLong; + /** * The MemStore holds in-memory modifications to the Store. Modifications * are {@link Cell}s. When asked to flush, current memstore is moved @@ -948,6 +948,10 @@ public class DefaultMemStore implements MemStore { return heapSize(); } + @Override + public void finalizeFlush() { + } + /** * Code to help figure if our approximation of object heap sizes is close * enough. See hbase-900. Fills memstores then waits so user can heap diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index faf6d81..403422d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -511,6 +511,9 @@ public class HMobStore extends HStore { } } + @Override public void finalizeFlush() { + } + public void updateCellsCountCompactedToMob(long count) { cellsCountCompactedToMob += count; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ccf2eb0..9316d90 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -18,6 +18,20 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Closeables; +import com.google.protobuf.ByteString; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import com.google.protobuf.TextFormat; + import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -185,20 +199,6 @@ import org.apache.hadoop.util.StringUtils; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.io.Closeables; -import com.google.protobuf.ByteString; -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; -import com.google.protobuf.TextFormat; - @InterfaceAudience.Private public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region { private static final Log LOG = LogFactory.getLog(HRegion.class); @@ -561,7 +561,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final WriteState writestate = new WriteState(); - long memstoreFlushSize; + /** + * Stores now get two flush signals; there is the old + * flush-because-the-store-has-exceeded-flush-size, + * {@link #memstoreFlushSize} and now a new high-water "force flush" mark + * {@link #memStoreFlushSizeHighThreshold}. How the Store internally makes use of these signals + * is up to the implementation. A Store may choose to act on the first signal flushing all to + * disk and ignore the (new) high threshold force flush (this is the 'old' and current default + * Store behavior) or it may treat the first signal as a 'warning' and do + * cleanup 'compacting' or pruning of its memory content on receipt of the low-water, flush + * signal and wait until it gets the high-water flush before it dumps to disk. + * + * memstoreFlushSize is set to the HTableDescriptor specified flush size or if unset, to + * HConstants.HREGION_MEMSTORE_FLUSH_SIZE. memStoreFlushSizeHighThreshold is set to mid-point + * between memstoreFlushSizeLowThreshold and blockingMemStoreSize + * (i.e. HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER * memstoreFlushSizeLowThreshold) + */ + private long memstoreFlushSize; + private long memStoreFlushSizeHighThreshold; + private long blockingMemStoreSize; + final long timestampSlop; final long rowProcessorTimeout; @@ -574,7 +593,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private long flushCheckInterval; // flushPerChanges is to prevent too many changes in memstore private long flushPerChanges; - private long blockingMemStoreSize; final long threadWakeFrequency; // Used to guard closes final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); @@ -760,6 +778,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.blockingMemStoreSize = this.memstoreFlushSize * conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER); + // Set the high threshold/force flush size to be between flush size and blocking size + this.memStoreFlushSizeHighThreshold = + (this.memstoreFlushSize + this.blockingMemStoreSize) / 2; + + } + + long getMemstoreFlushSize() { + return this.memstoreFlushSize; } /** @@ -1029,6 +1055,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return false; } + public void blockUpdates() { + this.updatesLock.writeLock().lock(); + } + + public void unblockUpdates() { + this.updatesLock.writeLock().unlock(); + } + @Override public HDFSBlocksDistribution getHDFSBlocksDistribution() { HDFSBlocksDistribution hdfsBlocksDistribution = @@ -2510,6 +2544,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // If we get to here, the HStores have been written. + for(Store storeToFlush :storesToFlush) { + storeToFlush.finalizeFlush(); + } if (wal != null) { wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); } @@ -2915,10 +2952,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi initialized = true; } long addedSize = doMiniBatchMutation(batchOp); - long newSize = this.addAndGetGlobalMemstoreSize(addedSize); - if (isFlushSize(newSize)) { - requestFlush(); - } + requestFlushIfNeeded(this.addAndGetGlobalMemstoreSize(addedSize)); } } finally { closeRegionOperation(op); @@ -3896,12 +3930,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - /* - * @param size - * @return True if size is over the flush threshold - */ - private boolean isFlushSize(final long size) { - return size > this.memstoreFlushSize; + private void requestFlushIfNeeded(final long size) throws RegionTooBusyException { + if (size < this.memstoreFlushSize) return; + if (size > this.memStoreFlushSizeHighThreshold) requestFlush(/*TODO Pass ENUM HIGH_THRESHOLD*/); + else if (size > this.memstoreFlushSize) requestFlush(/*TODO Pass ENUM LOW_THRESHOLD*/); } /** @@ -5050,7 +5082,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (this.rsAccounting != null) { rsAccounting.addAndGetRegionReplayEditsSize(getRegionInfo().getRegionName(), kvSize); } - return isFlushSize(this.addAndGetGlobalMemstoreSize(kvSize)); + return this.addAndGetGlobalMemstoreSize(kvSize) > this.memstoreFlushSize; } /* @@ -5280,7 +5312,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long c = count.decrementAndGet(); if (c <= 0) { synchronized (lock) { - if (count.get() <= 0 ){ + if (count.get() <= 0){ usable.set(false); RowLockContext removed = lockedRows.remove(row); assert removed == this: "we should never remove a different context"; @@ -6084,8 +6116,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } protected boolean isStopRow(Cell currentRowCell) { - return currentRowCell == null - || (stopRow != null && comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length) >= isScan); + return currentRowCell == null || + (stopRow != null && + comparator.compareRows(currentRowCell, stopRow, 0, stopRow.length) >= isScan); } @Override @@ -7010,9 +7043,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } finally { closeRegionOperation(); - if (!mutations.isEmpty() && - isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) { - requestFlush(); + if (!mutations.isEmpty()) { + requestFlushIfNeeded(this.addAndGetGlobalMemstoreSize(addedSize)); } } } @@ -7114,7 +7146,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte[] row = mutate.getRow(); checkRow(row, op.toString()); checkFamilies(mutate.getFamilyCellMap().keySet()); - boolean flush = false; Durability durability = getEffectiveDurability(mutate.getDurability()); boolean writeToWAL = durability != Durability.SKIP_WAL; WALEdit walEdits = null; @@ -7294,8 +7325,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi allKVs.addAll(entry.getValue()); } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); + this.addAndGetGlobalMemstoreSize(size); } } finally { this.updatesLock.readLock().unlock(); @@ -7329,10 +7359,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.metricsRegion.updateAppend(); } - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } + // Request a cache flush. Do it outside update lock. + requestFlushIfNeeded(this.memstoreSize.get()); return mutate.isReturnResults() ? Result.create(allKVs) : null; } @@ -7356,7 +7384,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi byte [] row = mutation.getRow(); checkRow(row, op.toString()); checkFamilies(mutation.getFamilyCellMap().keySet()); - boolean flush = false; Durability durability = getEffectiveDurability(mutation.getDurability()); boolean writeToWAL = durability != Durability.SKIP_WAL; WALEdit walEdits = null; @@ -7520,8 +7547,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); + this.addAndGetGlobalMemstoreSize(size); } } finally { this.updatesLock.readLock().unlock(); @@ -7554,10 +7580,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } + // Request a cache flush. Do it outside update lock. + requestFlushIfNeeded(this.memstoreSize.get()); return mutation.isReturnResults() ? Result.create(allKVs) : null; } @@ -7577,8 +7601,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + - (14 * Bytes.SIZEOF_LONG) + + 45 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + + (15 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); // woefully out of date - currently missing: @@ -7602,6 +7626,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi MultiVersionConcurrencyControl.FIXED_SIZE // mvcc + ClassSize.TREEMAP // maxSeqIdInStores + 2 * ClassSize.ATOMIC_INTEGER // majorInProgress, minorInProgress + + ClassSize.STORE_SERVICES // store services ; @Override @@ -8210,7 +8235,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi : CellComparator.COMPARATOR; } - public long getMemstoreFlushSize() { - return this.memstoreFlushSize; + //// method for debugging tests + public void throwException(String title, String regionName) { + String msg = title + ", "; + msg += getRegionInfo().toString(); + msg += getRegionInfo().isMetaRegion() ? " meta region " : " "; + msg += getRegionInfo().isMetaTable() ? " meta table " : " "; + msg += "stores: "; + for (Store s : getStores()) { + msg += s.getFamily().getNameAsString(); + msg += " size: "; + msg += s.getMemStoreSize(); + msg += " "; + } + msg += "end-of-stores"; + msg += ", memstore size "; + msg += getMemstoreSize(); + if (getRegionInfo().getRegionNameAsString().startsWith(regionName)) { + throw new RuntimeException(msg); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index badbd65..acbb8aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -43,6 +43,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -94,13 +100,6 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableCollection; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - /** * A Store holds a column family in a Region. Its a memstore and a set of zero * or more StoreFiles, which stretch backwards over time. @@ -347,12 +346,6 @@ public class HStore implements Store { } @Override - public long getMemstoreFlushSize() { - // TODO: Why is this in here? The flushsize of the region rather than the store? St.Ack - return this.region.memstoreFlushSize; - } - - @Override public long getFlushableSize() { return this.memstore.getFlushableSize(); } @@ -2317,6 +2310,10 @@ public class HStore implements Store { removeCompactedFiles(copyCompactedfiles); } + @Override public void finalizeFlush() { + memstore.finalizeFlush(); + } + private ThreadPoolExecutor getThreadPoolExecutor(int maxThreads) { return Threads.getBoundedCachedThreadPool(maxThreads, maxThreads * 3, TimeUnit.SECONDS, new ThreadFactory() { @@ -2402,4 +2399,9 @@ public class HStore implements Store { } } } -} + + @Override + public long getMemstoreFlushSize() { + return this.region.getMemstoreFlushSize(); + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index e9f8103..3e8fab8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -19,8 +19,8 @@ package org.apache.hadoop.hbase.regionserver; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; /** @@ -134,4 +134,11 @@ public interface MemStore extends HeapSize { * @return Total memory occupied by this MemStore. */ long size(); + + /** + * This method is called when it is clear that the flush to disk is completed. + * The store may do any post-flush actions at this point. + * One example is to update the wal with sequence number that is known only at the store level. + */ + void finalizeFlush(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 6d87057..371beba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -22,6 +22,10 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -49,11 +53,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; - /** * Regions store data for a certain region of a table. It stores all columns * for each row. A given table consists of one or more Regions. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java new file mode 100644 index 0000000..93a8c0d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Services a Store needs from a Region. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface RegionServicesForStores { + void blockUpdates(); + void unblockUpdates(); + long addAndGetGlobalMemstoreSize(long size); + long addAndGetGlobalMemstoreFluctuatingSize(long size); + long getGlobalMemstoreActiveSize(); + long getWalSequenceId() throws IOException; +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 8bb10f0..69312e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -22,8 +22,6 @@ import java.util.Collection; import java.util.List; import java.util.NavigableSet; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -32,6 +30,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.io.HeapSize; @@ -490,4 +490,11 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * Closes and archives the compacted files under this store */ void closeAndArchiveCompactedFiles() throws IOException; -} + + /** + * This method is called when it is clear that the flush to disk is completed. + * The store may do any post-flush actions at this point. + * One example is to update the wal with sequence number that is known only at the store level. + */ + void finalizeFlush(); +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java index d07bded..47d730e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java @@ -35,6 +35,8 @@ public interface StoreConfigInformation { */ // TODO: Why is this in here? It should be in Store and it should return the Store flush size, // not the Regions. St.Ack + // This is here for use by CompactionConfiguration! It is crazy that we have an Interface + // implemented by a Store so compactions can get a Region-level setting. long getMemstoreFlushSize(); /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index 94a63d8..e90e315 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; @@ -207,6 +206,9 @@ public class TestIOFencing { } super.completeCompaction(compactedFiles); } + + @Override public void finalizeFlush() { + } } private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 4582e31..db5ed98 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -19,53 +19,10 @@ package org.apache.hadoop.hbase.regionserver; -import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS; -import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR; -import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR; -import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; -import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; -import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; -import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -176,10 +133,52 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS; +import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR; +import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR; +import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Basic stand-alone testing of HRegion. No clusters! @@ -1289,7 +1288,8 @@ public class TestHRegion { private final AtomicInteger count; private Exception e; - GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, final AtomicInteger c) { + GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, final + AtomicInteger c) { super("getter." + i); this.g = new Get(r); this.done = d; @@ -2782,7 +2782,8 @@ public class TestHRegion { } catch (NotServingRegionException e) { // this is the correct exception that is expected } catch (IOException e) { - fail("Got wrong type of exception - should be a NotServingRegionException, but was an IOException: " + fail("Got wrong type of exception - should be a NotServingRegionException, but was an " + + "IOException: " + e.getMessage()); } } finally { @@ -2980,7 +2981,8 @@ public class TestHRegion { } @Test - public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws IOException { + public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws + IOException { byte[] row1 = Bytes.toBytes("row1"); byte[] fam1 = Bytes.toBytes("fam1"); byte[][] families = { fam1 }; @@ -4978,7 +4980,8 @@ public class TestHRegion { // move the file of the primary region to the archive, simulating a compaction Collection storeFiles = primaryRegion.getStore(families[0]).getStorefiles(); primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles); - Collection storeFileInfos = primaryRegion.getRegionFileSystem().getStoreFiles(families[0]); + Collection storeFileInfos = primaryRegion.getRegionFileSystem() + .getStoreFiles(families[0]); Assert.assertTrue(storeFileInfos == null || storeFileInfos.size() == 0); verifyData(secondaryRegion, 0, 1000, cq, families); @@ -4992,7 +4995,8 @@ public class TestHRegion { } } - private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { + private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws + IOException { putData(this.region, startRow, numRows, qf, families); } @@ -5675,7 +5679,8 @@ public class TestHRegion { currRow.clear(); hasNext = scanner.next(currRow); assertEquals(2, currRow.size()); - assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow.get(0).getRowLength(), row4, 0, + assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), + currRow.get(0).getRowLength(), row4, 0, row4.length)); assertTrue(hasNext); // 2. scan out "row3" (2 kv) -- 2.6.1