From 2df26fba36c7f67e587637f81c2a6b376275f6d7 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 29 Oct 2014 10:47:28 +0800 Subject: [PATCH] HBASE-10201 Port 'Make flush decisions per column family' to trunk --- .../org/apache/hadoop/hbase/HTableDescriptor.java | 25 +- .../java/org/apache/hadoop/hbase/HConstants.java | 15 + hbase-common/src/main/resources/hbase-default.xml | 20 + .../hbase/regionserver/DefaultStoreFlusher.java | 2 +- .../hadoop/hbase/regionserver/FlushRequester.java | 12 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 334 ++++++++--- .../hadoop/hbase/regionserver/HRegionServer.java | 2 +- .../apache/hadoop/hbase/regionserver/HStore.java | 13 +- .../hadoop/hbase/regionserver/LogRoller.java | 5 +- .../hadoop/hbase/regionserver/MemStoreFlusher.java | 54 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 4 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 64 ++- .../apache/hadoop/hbase/regionserver/wal/HLog.java | 7 +- .../org/apache/hadoop/hbase/TestIOFencing.java | 4 +- .../hadoop/hbase/TestPerColumnFamilyFlush.java | 626 +++++++++++++++++++++ .../hbase/regionserver/TestFlushRegionEntry.java | 4 +- .../hbase/regionserver/TestHeapMemoryManager.java | 16 +- .../hadoop/hbase/regionserver/wal/TestHLog.java | 31 +- .../hbase/regionserver/wal/TestWALReplay.java | 11 +- 19 files changed, 1092 insertions(+), 157 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index cff5a40..7b36b82 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -142,6 +142,12 @@ public class HTableDescriptor implements WritableComparable { public static final String MEMSTORE_FLUSHSIZE = "MEMSTORE_FLUSHSIZE"; private static final ImmutableBytesWritable MEMSTORE_FLUSHSIZE_KEY = new ImmutableBytesWritable(Bytes.toBytes(MEMSTORE_FLUSHSIZE)); + + public static final String MEMSTORE_COLUMNFAMILY_FLUSHSIZE_LOWER_BOUND = + "MEMSTORE_COLUMN_FAMILY_FLUSHSIZE_LOWER_BOUND"; + + private static final ImmutableBytesWritable MEMSTORE_COLUMNFAMILY_FLUSHSIZE_LOWER_BOUND_KEY = + new ImmutableBytesWritable( Bytes.toBytes(MEMSTORE_COLUMNFAMILY_FLUSHSIZE_LOWER_BOUND)); /** * INTERNAL Used by rest interface to access this metadata @@ -221,6 +227,8 @@ public class HTableDescriptor implements WritableComparable { */ public static final long DEFAULT_MEMSTORE_FLUSH_SIZE = 1024*1024*128L; + public static final long DEFAULT_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND = 1024 * 1024 * 16L; + public static final int DEFAULT_REGION_REPLICATION = 1; private final static Map DEFAULT_VALUES @@ -777,6 +785,21 @@ public class HTableDescriptor implements WritableComparable { return this; } + public long getMemStoreColumnFamilyFlushSize() { + byte [] value = getValue(MEMSTORE_COLUMNFAMILY_FLUSHSIZE_LOWER_BOUND_KEY); + if (value != null) { + return Long.parseLong(Bytes.toString(value)); + } + return -1; + } + + public HTableDescriptor setMemStoreColumnFamilyFlushSize( + long memstoreColumnFamilyFlushSize) { + setValue(MEMSTORE_COLUMNFAMILY_FLUSHSIZE_LOWER_BOUND_KEY, + Long.toString(memstoreColumnFamilyFlushSize)); + return this; + } + /** * Adds a column family. * @param family HColumnDescriptor of family to add. @@ -1042,7 +1065,7 @@ public class HTableDescriptor implements WritableComparable { * This compares the content of the two descriptors and not the reference. * * @return 0 if the contents of the descriptors are exactly matching, - * 1 if there is a mismatch in the contents + * 1 if there is a mismatch in the contents */ @Override public int compareTo(final HTableDescriptor other) { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index ba152c0..b0be8e6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -339,6 +339,21 @@ public final class HConstants { public static final String HREGION_MEMSTORE_FLUSH_SIZE = "hbase.hregion.memstore.flush.size"; + /** Conf key for enabling Per Column Family flushing of memstores */ + public static final String HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH = + "hbase.hregion.memstore.percolumnfamilyflush.enabled"; + + /** Default value for the Per Column Family flush knob */ + public static final Boolean DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH = + true; + + /** + * If Per Column Family flushing is enabled, this is the minimum size + * at which a column family's memstore is flushed. + */ + public static final String HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND = + "hbase.hregion.memstore.percolumnfamilyflush.flush.size.lower.bound"; + public static final String HREGION_EDITS_REPLAY_SKIP_ERRORS = "hbase.hregion.edits.replay.skip.errors"; diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 15a53dc..da5484c 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -596,6 +596,26 @@ possible configurations would overwhelm and obscure the important. every hbase.server.thread.wakefrequency. + hbase.hregion.memstore.percolumnfamilyflush.enabled + true + + whether to turn on per column family flushing. + + + + hbase.hregion.memstore.percolumnfamilyflush.flush.size.lower.bound + 16777216 + + If per column family flushing is turned on, then every time that we hit the + total memstore limit, we find out all the column families whose memstores + exceed this value, and only flush them, while retaining the others whose + memstores are lower than this limit. If none of the families have their + memstore size more than this, all the memstores will be flushed + (just as usual). This value should be less than half of the total memstore + threshold (hbase.hregion.memstore.flush.size). + + + hbase.hregion.preclose.flush.size 5242880 diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 73b8cb9..7035777 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -85,7 +85,7 @@ public class DefaultStoreFlusher extends StoreFlusher { scanner.close(); } LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize=" - + StringUtils.humanReadableInt(snapshot.getSize()) + + + StringUtils.byteDesc(snapshot.getSize()) + ", hasBloomFilter=" + writer.hasGeneralBloom() + ", into tmp file " + writer.getPath()); result.add(writer.getPath()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java index e1c3144..1a80e6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java @@ -30,15 +30,23 @@ public interface FlushRequester { * Tell the listener the cache needs to be flushed. * * @param region the HRegion requesting the cache flush + * @param selectiveFlushRequest is this a selective flush request? This means + * that if some column families are dominating + * the memstore size, only those column families + * would be flushed. */ - void requestFlush(HRegion region); + void requestFlush(HRegion region, boolean selectiveFlushRequest); /** * Tell the listener the cache needs to be flushed after a delay * * @param region the HRegion requesting the cache flush * @param delay after how much time should the flush happen + * @param selectiveFlushRequest is this a selective flush request? This means + * that if some column families are dominating + * the memstore size, only those column families + * would be flushed. */ - void requestDelayedFlush(HRegion region, long delay); + void requestDelayedFlush(HRegion region, long delay, boolean selectiveFlushRequest); /** * Register a FlushRequestListener diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 704e5c0..4aba8b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -41,6 +42,7 @@ import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -61,7 +63,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -86,6 +87,7 @@ import org.apache.hadoop.hbase.RegionTooBusyException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.backup.HFileArchiver; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -498,7 +500,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // long memstoreFlushSize; final long timestampSlop; final long rowProcessorTimeout; - private volatile long lastFlushTime; + + // The maximum size a column family's memstore can grow up to, + // before being flushed. + private long columnfamilyMemstoreFlushSize; + + // Last flush time for each Store. Useful when we are flushing for each column + private final ConcurrentMap lastStoreFlushTimeMap = + new ConcurrentHashMap(); + + // keep track of oldest sequence id of edit in a store. + // TODO: dup with oldestUnflushedRegionSequenceIds in FSHLog. + private final ConcurrentMap oldestSeqIdOfStore = + new ConcurrentHashMap(); + + // Selective flushing of Column Families which dominate the memstore? + private final boolean perColumnFamilyFlushEnabled; + final RegionServerServices rsServices; private RegionServerAccounting rsAccounting; private long flushCheckInterval; @@ -602,7 +620,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed " + MAX_FLUSH_PER_CHANGES); } - + this.perColumnFamilyFlushEnabled = conf.getBoolean( + HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, + HConstants.DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH); + LOG.debug("Per Column Family Flushing: " + perColumnFamilyFlushEnabled); this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); @@ -684,6 +705,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // this.memstoreFlushSize = flushSize; this.blockingMemStoreSize = this.memstoreFlushSize * conf.getLong("hbase.hregion.memstore.block.multiplier", 2); + + long columnfamilyFlushSize = this.htableDescriptor + .getMemStoreColumnFamilyFlushSize(); + if (columnfamilyFlushSize <= 0) { + columnfamilyFlushSize = conf.getLong( + HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, + HTableDescriptor.DEFAULT_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND); + } + this.columnfamilyMemstoreFlushSize = columnfamilyFlushSize; } /** @@ -761,7 +791,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // Initialize split policy this.splitPolicy = RegionSplitPolicy.create(this, conf); - this.lastFlushTime = EnvironmentEdgeManager.currentTime(); + long lastFlushTime = EnvironmentEdgeManager.currentTime(); + for (Store store: stores.values()) { + this.lastStoreFlushTimeMap.put(store, lastFlushTime); + } + if (perColumnFamilyFlushEnabled) { + for (Store store: stores.values()) { + this.oldestSeqIdOfStore.put(store, new AtomicLong(Long.MAX_VALUE)); + } + } // Use maximum of log sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). long nextSeqid = maxSeqId + 1; @@ -1289,10 +1327,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // status.setStatus("Running coprocessor post-close hooks"); this.coprocessorHost.postClose(abort); } - if ( this.metricsRegion != null) { + if (this.metricsRegion != null) { this.metricsRegion.close(); } - if ( this.metricsRegionWrapper != null) { + if (this.metricsRegionWrapper != null) { Closeables.closeQuietly(this.metricsRegionWrapper); } status.markComplete("Closed"); @@ -1431,9 +1469,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return this.fs; } - /** @return the last time the region was flushed */ - public long getLastFlushTime() { - return this.lastFlushTime; + /** + * @return Returns the earliest time a store in the region was flushed. All + * other stores in the region would have been flushed either at, or + * after this time. + */ + public long getEarliestFlushTimeForAllStores() { + return Collections.min(lastStoreFlushTimeMap.values()); + } + + /** + * @return Returns the latest time a store in the region was flushed, which is + * the last flush time of this region. + */ + public long getLatestFlushTimeForAllStores() { + return Collections.max(lastStoreFlushTimeMap.values()); } ////////////////////////////////////////////////////////////////////////////// @@ -1462,6 +1512,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return this.comparator; } + public boolean isPerColumnFamilyFlushEnabled() { + return perColumnFamilyFlushEnabled; + } + /* * Do preparation for pending compaction. * @throws IOException @@ -1596,6 +1650,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } + public FlushResult flushcache() throws IOException { + return flushcache(false); + } /** * Flush the cache. * @@ -1609,20 +1666,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * *

This method may block for some time, so it should not be called from a * time-sensitive thread. - * + * @param selectiveFlushRequest If true, selectively flush column families + * which dominate the memstore size, provided it + * is enabled in the configuration. * @return true if the region needs compacting * * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - public FlushResult flushcache() throws IOException { + public FlushResult flushcache(boolean selectiveFlushRequest) throws IOException { // fail-fast instead of waiting on the lock if (this.closing.get()) { String msg = "Skipping flush on " + this + " because closing"; LOG.debug(msg); return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } + // If a selective flush was requested, but the per-column family switch is + // off, we cannot do a selective flush. + if (selectiveFlushRequest && !perColumnFamilyFlushEnabled) { + LOG.warn("Disabling selective flushing of Column Families' memstores."); + selectiveFlushRequest = false; + } MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); status.setStatus("Acquiring readlock on region"); // block waiting for the lock for flushing cache @@ -1658,8 +1723,37 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } } + Collection specificStoresToFlush = null; try { - FlushResult fs = internalFlushcache(status); + // We now have to flush the region since it has + // reached the threshold, however, we might not need + // to flush the entire region. If there are certain + // column families that are dominating the memstore size, + // we will flush just those. The second behavior only + // happens when selectiveFlushRequest is true. + FlushResult fs; + + // If it is okay to flush the memstore by selecting the + // column families which dominate the size, we are going + // to populate the specificStoresToFlush set. + if (selectiveFlushRequest) { + specificStoresToFlush = new HashSet(); + for (Store store: stores.values()) { + if (shouldFlushStore(store)) { + specificStoresToFlush.add(store); + LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this + + " will be flushed"); + } + } + // Didn't find any CFs which were above the threshold for selection. + if (specificStoresToFlush.size() == 0) { + LOG.debug("Since none of the CFs were above the size, flushing all."); + specificStoresToFlush = stores.values(); + } + } else { + specificStoresToFlush = stores.values(); + } + fs = internalFlushcache(this.log, -1, specificStoresToFlush, status); if (coprocessorHost != null) { status.setStatus("Running post-flush coprocessor hooks"); @@ -1681,6 +1775,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } + private boolean shouldFlushStore(Store store) { + return store.getMemStoreSize() > this.columnfamilyMemstoreFlushSize; + } + /** * Should the memstore be flushed now */ @@ -1695,7 +1793,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } long now = EnvironmentEdgeManager.currentTime(); //if we flushed in the recent past, we don't need to do again now - if ((now - getLastFlushTime() < flushCheckInterval)) { + if ((now - getLatestFlushTimeForAllStores() < flushCheckInterval)) { return false; } //since we didn't flush in the recent past, flush now if certain conditions @@ -1727,19 +1825,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // */ protected FlushResult internalFlushcache(MonitoredTask status) throws IOException { - return internalFlushcache(this.log, -1, status); + return internalFlushcache(this.log, -1, stores.values(), status); } /** * @param wal Null if we're NOT to go via hlog/wal. * @param myseqid The seqid to use if wal is null writing out flush file. + * @param storesToFlush The list of stores to flush. * @return object describing the flush's state * @throws IOException * @see #internalFlushcache(MonitoredTask) */ - protected FlushResult internalFlushcache( - final HLog wal, final long myseqid, MonitoredTask status) - throws IOException { + protected FlushResult internalFlushcache(final HLog wal, final long myseqid, + Collection storesToFlush, MonitoredTask status) throws IOException { if (this.rsServices != null && this.rsServices.isAborted()) { // Don't flush when server aborting, it's unsafe throw new IOException("Aborting flush because server is aborted..."); @@ -1753,6 +1851,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // this.updatesLock.writeLock().lock(); try { if (this.memstoreSize.get() <= 0) { + // Since there is nothing to flush, we will reset the flush times for all the stores. + // and do not need to reset oldest sequence id because there is no edit. + for (Store store: stores.values()) { + lastStoreFlushTimeMap.put(store, startTime); + } // Presume that if there are still no edits in the memstore, then // there are no edits for // this region out in the WAL/HLog subsystem so no need to do any @@ -1785,25 +1888,50 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - LOG.info("Started memstore flush for " + this + - ", current region memstore size " + - StringUtils.byteDesc(this.memstoreSize.get()) + - ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid)); - + LOG.info("Started memstore flush for " + this + + ", current region memstore size " + StringUtils.byteDesc(this.memstoreSize.get()) + + ", and " + storesToFlush.size() + "/" + stores.size() + + " column families' memstores are being flushed." + + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid)); + for (Store store: storesToFlush) { + LOG.info("Flushing Column Family: " + store.getColumnFamilyName() + " which was occupying " + + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore."); + } // Stop updates while we snapshot the memstore of all of these regions' stores. We only have // to do this for a moment. It is quick. We also set the memstore size to zero here before we // allow updates again so its value will represent the size of the updates received // during flush MultiVersionConsistencyControl.WriteEntry w = null; - // We have to take an update lock during snapshot, or else a write could end up in both snapshot // and memstore (makes it difficult to do atomic rows then) status.setStatus("Obtaining lock to block concurrent updates"); // block waiting for the lock for internal flush this.updatesLock.writeLock().lock(); - long totalFlushableSize = 0; status.setStatus("Preparing to flush by snapshotting stores in " + getRegionInfo().getEncodedName()); + long totalFlushableSizeOfFlushableStores = 0; + + Set storesNotToFlush = new HashSet(stores.values()); + storesNotToFlush.removeAll(storesToFlush); + + // Calculate the oldest LSN numbers for edits in the stores that will + // be flushed and the ones which won't be. This will be used to populate + // the firstSeqWrittenInCurrentMemstore and + // firstSeqWrittenInSnapshotMemstore maps correctly. + long oldestSeqIdInStoresToFlush = Long.MAX_VALUE; + if (!storesNotToFlush.isEmpty()) { + // no need to calculate this if we flush all stores. + for (Store store: storesToFlush) { + oldestSeqIdInStoresToFlush = Math.min(oldestSeqIdInStoresToFlush, + oldestSeqIdOfStore.get(store).get()); + } + } + + long oldestSeqIdInStoresNotToFlush = Long.MAX_VALUE; + for (Store store: storesNotToFlush) { + oldestSeqIdInStoresNotToFlush = Math.min(oldestSeqIdInStoresNotToFlush, + oldestSeqIdOfStore.get(store).get()); + } List storeFlushCtxs = new ArrayList(stores.size()); TreeMap> committedFiles = new TreeMap>( Bytes.BYTES_COMPARATOR); @@ -1814,26 +1942,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // try { w = mvcc.beginMemstoreInsert(); if (wal != null) { - if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { + flushSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes(), + oldestSeqIdInStoresToFlush, oldestSeqIdInStoresNotToFlush, sequenceId); + if (flushSeqId == HLog.NO_SEQUENCE_ID) { // This should never happen. String msg = "Flush will not be started for [" + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; status.setStatus(msg); return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } - // Get a sequence id that we can use to denote the flush. It will be one beyond the last - // edit that made it into the hfile (the below does not add an edit, it just asks the - // WAL system to return next sequence edit). - flushSeqId = getNextSequenceId(wal); } else { // use the provided sequence Id as WAL is not being used for this flush. flushSeqId = myseqid; } - for (Store s : stores.values()) { - totalFlushableSize += s.getFlushableSize(); + for (Store s : storesToFlush) { + totalFlushableSizeOfFlushableStores += s.getFlushableSize(); storeFlushCtxs.add(s.createFlushContext(flushSeqId)); committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL + AtomicLong oldestSeqId = oldestSeqIdOfStore.get(s); + if (oldestSeqId != null) { // maybe null when initialize + // reset oldest sequence id + oldestSeqId.set(Long.MAX_VALUE); + } } // write the snapshot start to WAL @@ -1870,7 +2001,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // this.updatesLock.writeLock().unlock(); } String s = "Finished memstore snapshotting " + this + - ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; + ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores; status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); // sync unflushed WAL changes @@ -1920,8 +2051,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // Switch snapshot (in memstore) -> new hfile (thus causing // all the store scanners to reset/reseek). - Iterator it = stores.values().iterator(); // stores.values() and storeFlushCtxs have - // same order + Iterator it = storesToFlush.iterator(); + // stores.values() and storeFlushCtxs have same order for (StoreFlushContext flush : storeFlushCtxs) { boolean needsCompaction = flush.commit(status); if (needsCompaction) { @@ -1932,7 +2063,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // storeFlushCtxs.clear(); // Set down the memstore size by amount of flush. - this.addAndGetGlobalMemstoreSize(-totalFlushableSize); + this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores); if (wal != null) { // write flush marker to WAL. If fail, we should throw DroppedSnapshotException @@ -1974,7 +2105,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } // Record latest flush time - this.lastFlushTime = EnvironmentEdgeManager.currentTime(); + for (Store store: storesToFlush) { + this.lastStoreFlushTimeMap.put(store, startTime); + } // Update the last flushed sequence id for region. TODO: This is dup'd inside the WAL/FSHlog. this.lastFlushSeqId = flushSeqId; @@ -1987,13 +2120,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // long time = EnvironmentEdgeManager.currentTime() - startTime; long memstoresize = this.memstoreSize.get(); - String msg = "Finished memstore flush of ~" + - StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize + - ", currentsize=" + - StringUtils.byteDesc(memstoresize) + "/" + memstoresize + - " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId + - ", compaction requested=" + compactionRequested + - ((wal == null)? "; wal=null": ""); + String msg = "Finished memstore flush of ~" + + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/" + + totalFlushableSizeOfFlushableStores + ", currentsize=" + + StringUtils.byteDesc(memstoresize) + "/" + memstoresize + + " for region " + this + " in " + time + "ms, sequenceid=" + + flushSeqId + ", compaction requested=" + compactionRequested + + ((wal == null) ? "; wal=null" : ""); LOG.info(msg); status.setStatus(msg); @@ -2129,7 +2262,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if(delete.getFamilyCellMap().isEmpty()){ for(byte [] family : this.htableDescriptor.getFamiliesKeys()){ // Don't eat the timestamp - delete.deleteFamily(family, delete.getTimeStamp()); + delete.addFamily(family, delete.getTimeStamp()); } } else { for(byte [] family : delete.getFamilyCellMap().keySet()) { @@ -2492,6 +2625,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } } + + private void updateOldestSeqIdOfStore(Collection storesUpdated, + HLogKey walKey, long lowestPossibleSeqId) throws IOException { + List needToUpdate = new ArrayList(); + for (Store store: storesUpdated) { + AtomicLong oldestSeqId = oldestSeqIdOfStore.get(store); + if (lowestPossibleSeqId < oldestSeqId.get()) { + needToUpdate.add(oldestSeqId); + } + } + // normal case + if (needToUpdate.isEmpty()) { + return; + } + long seqId = walKey.getSequenceId(); + for (AtomicLong oldestSeqId: needToUpdate) { + for (;;) { + long prevSeqId = oldestSeqId.get(); + if (seqId >= prevSeqId) { + break; + } + if (oldestSeqId.compareAndSet(prevSeqId, seqId)) { + break; + } + } + } + } @SuppressWarnings("unchecked") private long doMiniBatchMutation(BatchOperationInProgress batchOp) throws IOException { @@ -2517,6 +2677,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // reference family maps directly so coprocessors can mutate them if desired Map>[] familyMaps = new Map[batchOp.operations.length]; List memstoreCells = new ArrayList(); + Set storesUpdated = new HashSet(); + long lowestPossibleSeqId = sequenceId.get(); // We try to set up a batch in the range [firstIndex,lastIndexExclusive) int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; @@ -2678,7 +2840,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // continue; } doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote - addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells); + addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells, storesUpdated); } // ------------------------------------ @@ -2755,8 +2917,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); } + // ------------------------------------------------------------------ + // STEP 6. Record oldest sequence id of memstore + // ------------------------------------------------------------------ + if (perColumnFamilyFlushEnabled) { + updateOldestSeqIdOfStore(storesUpdated, walKey, lowestPossibleSeqId); + } + // ------------------------------- - // STEP 6. Release row locks, etc. + // STEP 7. Release row locks, etc. // ------------------------------- if (locked) { this.updatesLock.readLock().unlock(); @@ -2765,7 +2934,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // releaseRowLocks(acquiredRowLocks); // ------------------------- - // STEP 7. Sync wal. + // STEP 8. Sync wal. // ------------------------- if (txid != 0) { syncOrDefer(txid, durability); @@ -2780,8 +2949,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // coprocessorHost.postBatchMutate(miniBatchOp); } + // ------------------------------------------------------------------ - // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. + // STEP 9. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ if (w != null) { mvcc.completeMemstoreInsertWithSeqNum(w, walKey); @@ -2789,7 +2959,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } // ------------------------------------ - // STEP 9. Run coprocessor post hooks. This should be done after the wal is + // STEP 10. Run coprocessor post hooks. This should be done after the wal is // synced so that the coprocessor contract is adhered to. // ------------------------------------ if (!isInReplay && coprocessorHost != null) { @@ -3117,8 +3287,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * We throw RegionTooBusyException if above memstore limit * and expect client to retry using some kind of backoff */ - private void checkResources() - throws RegionTooBusyException { + private void checkResources() throws RegionTooBusyException { // If catalog region, do not impose resource constraints or block updates. if (this.getRegionInfo().isMetaRegion()) return; @@ -3180,12 +3349,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction. * If null, then this method internally creates a mvcc transaction. * @param output newly added KVs into memstore + * @param storesUpdated used to track stores we modified * @return the additional memory usage of the memstore caused by the * new entries. * @throws IOException */ private long applyFamilyMapToMemstore(Map> familyMap, - long mvccNum, List memstoreCells) throws IOException { + long mvccNum, List memstoreCells, Set storesUpdated) throws IOException { long size = 0; for (Map.Entry> e : familyMap.entrySet()) { @@ -3201,6 +3371,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // size += ret.getFirst(); memstoreCells.add(ret.getSecond()); } + storesUpdated.add(store); } return size; @@ -3308,7 +3479,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // writestate.flushRequested = true; } // Make request outside of synchronize block; HBASE-818. - this.rsServices.getFlushRequester().requestFlush(this); + this.rsServices.getFlushRequester().requestFlush(this, perColumnFamilyFlushEnabled); if (LOG.isDebugEnabled()) { LOG.debug("Flush requested on " + this); } @@ -3428,7 +3599,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } if (seqid > minSeqIdForTheRegion) { // Then we added some edits to memory. Flush and cleanup split edit files. - internalFlushcache(null, seqid, status); + internalFlushcache(null, seqid, stores.values(), status); } // Now delete the content of recovered edits. We're done w/ them. for (Path file: files) { @@ -3569,7 +3740,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // flush = restoreEdit(store, cell); editsCount++; } - if (flush) internalFlushcache(null, currentEditSeqId, status); + if (flush) internalFlushcache(null, currentEditSeqId, stores.values(), status); if (coprocessorHost != null) { coprocessorHost.postWALRestore(this.getRegionInfo(), key, val); @@ -3908,7 +4079,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is // a sequence id that we can be sure is beyond the last hfile written). if (assignSeqId) { - FlushResult fs = this.flushcache(); + FlushResult fs = this.flushcache(false); if (fs.isFlushSucceeded()) { seqId = fs.flushSequenceId; } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { @@ -4926,8 +5097,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // FileSystem fs = a.getRegionFileSystem().getFileSystem(); // Make sure each region's cache is empty - a.flushcache(); - b.flushcache(); + a.flushcache(false); + b.flushcache(false); // Compact each region so we only have one store file per family a.compactStores(true); @@ -5041,7 +5212,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // do after lock if (this.metricsRegion != null) { - long totalSize = 0l; + long totalSize = 0L; for (Cell cell : results) { totalSize += CellUtil.estimatedLengthOf(cell); } @@ -5117,6 +5288,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // startRegionOperation(); WALEdit walEdit = new WALEdit(); + Set storesUpdated = new HashSet(); + long lowestPossibleSeqId = sequenceId.get(); // 1. Run pre-process hook try { @@ -5188,6 +5361,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // Pair ret = store.add(cell); addedSize += ret.getFirst(); memstoreCells.add(ret.getSecond()); + storesUpdated.add(store); } } @@ -5205,22 +5379,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); } - - // 9. Release region lock + // 9. record oldest sequence id of memstore + if (perColumnFamilyFlushEnabled) { + updateOldestSeqIdOfStore(storesUpdated, walKey, + lowestPossibleSeqId); + } + // 10. Release region lock if (locked) { this.updatesLock.readLock().unlock(); locked = false; } - // 10. Release row lock(s) + // 11. Release row lock(s) releaseRowLocks(acquiredRowLocks); - // 11. Sync edit log + // 12. Sync edit log if (txid != 0) { syncOrDefer(txid, getEffectiveDurability(processor.useDurability())); } walSyncSuccessful = true; - // 12. call postBatchMutate hook + // 13. call postBatchMutate hook processor.postBatchMutate(this); } } finally { @@ -5235,7 +5413,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } } - // 13. Roll mvcc forward + // 14. Roll mvcc forward if (writeEntry != null) { mvcc.completeMemstoreInsertWithSeqNum(writeEntry, walKey); } @@ -5332,7 +5510,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // WALEdit walEdits = null; List allKVs = new ArrayList(append.size()); Map> tempMemstore = new HashMap>(); - + Set storesUpdated = new HashSet(); + long lowestPossibleSeqId = sequenceId.get(); long size = 0; long txid = 0; @@ -5467,6 +5646,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // doRollBackMemstore = true; } } + storesUpdated.add(store); allKVs.addAll(entry.getValue()); } @@ -5486,7 +5666,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); } - + // record oldest sequence id of memstore + if (perColumnFamilyFlushEnabled) { + updateOldestSeqIdOfStore(storesUpdated, walKey, + lowestPossibleSeqId); + } size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } finally { @@ -5548,7 +5732,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // WALEdit walEdits = null; List allKVs = new ArrayList(increment.size()); Map> tempMemstore = new HashMap>(); - + Set storesUpdated = new HashSet(); + long lowestPossibleSeqId = sequenceId.get(); + long size = 0; long txid = 0; @@ -5687,6 +5873,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // doRollBackMemstore = true; } } + storesUpdated.add(store); } size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); @@ -5710,6 +5897,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); } + // record oldest sequence id of memstore + if (perColumnFamilyFlushEnabled) { + updateOldestSeqIdOfStore(storesUpdated, walKey, + lowestPossibleSeqId); + } } finally { this.updatesLock.readLock().unlock(); } @@ -5763,7 +5955,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 43 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (12 * Bytes.SIZEOF_LONG) + 4 * Bytes.SIZEOF_BOOLEAN); @@ -6328,6 +6520,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // public Map getMaxStoreSeqIdForLogReplay() { return this.maxSeqIdInStores; } + + public long getOldestSeqIdOfStore(byte[] column) { + return oldestSeqIdOfStore.get(getStore(column)).get(); + } /** * @return if a given region is in compaction now. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 0b202d5..fabe534 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1458,7 +1458,7 @@ public class HRegionServer extends HasThread implements //Throttle the flushes by putting a delay. If we don't throttle, and there //is a balanced write-load on the regions in a table, we might end up //overwhelming the filesystem with too many flushes at once. - requester.requestDelayedFlush(r, randomDelay); + requester.requestDelayedFlush(r, randomDelay, r.isPerColumnFamilyFlushEnabled()); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 35b65eb..c883242 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -91,7 +91,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; @@ -924,7 +923,7 @@ public class HStore implements Store { if (LOG.isInfoEnabled()) { LOG.info("Added " + sf + ", entries=" + r.getEntries() + ", sequenceid=" + logCacheFlushId + - ", filesize=" + StringUtils.humanReadableInt(r.length())); + ", filesize=" + StringUtils.byteDesc(r.length())); } return sf; } @@ -1147,7 +1146,7 @@ public class HStore implements Store { LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in " + this + " of " + this.getRegionInfo().getRegionNameAsString() + " into tmpdir=" + fs.getTempDir() + ", totalSize=" - + StringUtils.humanReadableInt(cr.getSize())); + + StringUtils.byteDesc(cr.getSize())); long compactionStartTime = EnvironmentEdgeManager.currentTime(); List sfs = null; @@ -1264,12 +1263,12 @@ public class HStore implements Store { for (StoreFile sf: sfs) { message.append(sf.getPath().getName()); message.append("(size="); - message.append(StringUtils.humanReadableInt(sf.getReader().length())); + message.append(StringUtils.byteDesc(sf.getReader().length())); message.append("), "); } } message.append("total size for store is ") - .append(StringUtils.humanReadableInt(storeSize)) + .append(StringUtils.byteDesc(storeSize)) .append(". This selection was in queue for ") .append(StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())) .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime)) @@ -1329,7 +1328,7 @@ public class HStore implements Store { } } - this.replaceStoreFiles(inputStoreFiles, Collections.EMPTY_LIST); + this.replaceStoreFiles(inputStoreFiles, Collections.emptyList()); this.completeCompaction(inputStoreFiles); } @@ -1540,7 +1539,7 @@ public class HStore implements Store { completeCompaction(delSfs); LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in " + this + " of " + this.getRegionInfo().getRegionNameAsString() - + "; total size for store is " + StringUtils.humanReadableInt(storeSize)); + + "; total size for store is " + StringUtils.byteDesc(storeSize)); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 8179c98..987b90e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -130,7 +130,10 @@ class LogRoller extends HasThread implements WALActionsListener { if (r != null) { requester = this.services.getFlushRequester(); if (requester != null) { - requester.requestFlush(r); + // If we do a selective flush, some column families might remain in + // the memstore for a long time, and might cause old logs to + // accumulate. Hence, we would not request for a selective flush. + requester.requestFlush(r, false); scheduled = true; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 01da510..249b556 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -115,10 +115,10 @@ class MemStoreFlusher implements FlushRequester { int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); this.flushHandlers = new FlushHandler[handlerCount]; LOG.info("globalMemStoreLimit=" + - StringUtils.humanReadableInt(this.globalMemStoreLimit) + + StringUtils.byteDesc(this.globalMemStoreLimit) + ", globalMemStoreLimitLowMark=" + - StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) + - ", maxHeap=" + StringUtils.humanReadableInt(max)); + StringUtils.byteDesc(this.globalMemStoreLimitLowMark) + + ", maxHeap=" + StringUtils.byteDesc(max)); } public Counter getUpdatesBlockedMsHighWater() { @@ -163,9 +163,9 @@ class MemStoreFlusher implements FlushRequester { LOG.debug("Under global heap pressure: " + "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " + "store files, but is " + - StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) + + StringUtils.byteDesc(bestAnyRegion.memstoreSize.get()) + " vs best flushable region's " + - StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) + + StringUtils.byteDesc(bestFlushableRegion.memstoreSize.get()) + ". Choosing the bigger."); } regionToFlush = bestAnyRegion; @@ -180,7 +180,7 @@ class MemStoreFlusher implements FlushRequester { Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); LOG.info("Flush of region " + regionToFlush + " due to global heap pressure"); - flushedOne = flushRegion(regionToFlush, true); + flushedOne = flushRegion(regionToFlush, true, false); if (!flushedOne) { LOG.info("Excluding unflushable region " + regionToFlush + " - trying to find a different region to flush."); @@ -206,7 +206,7 @@ class MemStoreFlusher implements FlushRequester { if (fqe == null || fqe instanceof WakeupFlushThread) { if (isAboveLowWaterMark()) { LOG.debug("Flush thread woke up because memory above low water=" - + StringUtils.humanReadableInt(globalMemStoreLimitLowMark)); + + StringUtils.byteDesc(globalMemStoreLimitLowMark)); if (!flushOneForGlobalPressure()) { // Wasn't able to flush any region, but we're above low water mark // This is unlikely to happen, but might happen when closing the @@ -293,23 +293,23 @@ class MemStoreFlusher implements FlushRequester { getGlobalMemstoreSize() >= globalMemStoreLimitLowMark; } - public void requestFlush(HRegion r) { + public void requestFlush(HRegion r, boolean selectiveFlushRequest) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has no delay so it will be added at the top of the flush // queue. It'll come out near immediately. - FlushRegionEntry fqe = new FlushRegionEntry(r); + FlushRegionEntry fqe = new FlushRegionEntry(r, selectiveFlushRequest); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); } } } - public void requestDelayedFlush(HRegion r, long delay) { + public void requestDelayedFlush(HRegion r, long delay, boolean selectiveFlushRequest) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has some delay - FlushRegionEntry fqe = new FlushRegionEntry(r); + FlushRegionEntry fqe = new FlushRegionEntry(r, selectiveFlushRequest); fqe.requeue(delay); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); @@ -362,7 +362,7 @@ class MemStoreFlusher implements FlushRequester { } } - /* + /** * A flushRegion that checks store file count. If too many, puts the flush * on delay queue to retry later. * @param fqe @@ -404,22 +404,24 @@ class MemStoreFlusher implements FlushRequester { return true; } } - return flushRegion(region, false); + return flushRegion(region, false, fqe.isSelectiveFlushRequest()); } - /* + /** * Flush a region. * @param region Region to flush. * @param emergencyFlush Set if we are being force flushed. If true the region * needs to be removed from the flush queue. If false, when we were called * from the main flusher run loop and we got the entry to flush by calling * poll on the flush queue (which removed it). - * + * @param selectiveFlushRequest Do we want to selectively flush only the + * column families that dominate the memstore size? * @return true if the region was successfully flushed, false otherwise. If * false, there will be accompanying log messages explaining why the log was * not flushed. */ - private boolean flushRegion(final HRegion region, final boolean emergencyFlush) { + private boolean flushRegion(final HRegion region, final boolean emergencyFlush, + boolean selectiveFlushRequest) { synchronized (this.regionsInQueue) { FlushRegionEntry fqe = this.regionsInQueue.remove(region); if (fqe != null && emergencyFlush) { @@ -431,7 +433,7 @@ class MemStoreFlusher implements FlushRequester { lock.readLock().lock(); try { notifyFlushRequest(region, emergencyFlush); - boolean shouldCompact = region.flushcache().isCompactionNeeded(); + boolean shouldCompact = region.flushcache(selectiveFlushRequest).isCompactionNeeded(); // We just want to check the size boolean shouldSplit = region.checkSplit() != null; if (shouldSplit) { @@ -511,9 +513,9 @@ class MemStoreFlusher implements FlushRequester { startTime = EnvironmentEdgeManager.currentTime(); LOG.info("Blocking updates on " + server.toString() + ": the global memstore size " + - StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) + + StringUtils.byteDesc(server.getRegionServerAccounting().getGlobalMemstoreSize()) + " is >= than blocking " + - StringUtils.humanReadableInt(globalMemStoreLimit) + " size"); + StringUtils.byteDesc(globalMemStoreLimit) + " size"); } blocked = true; wakeupFlushThread(); @@ -636,11 +638,14 @@ class MemStoreFlusher implements FlushRequester { private final long createTime; private long whenToExpire; private int requeueCount = 0; - - FlushRegionEntry(final HRegion r) { + + private boolean selectiveFlushRequest; + + FlushRegionEntry(final HRegion r, boolean selectiveFlushRequest) { this.region = r; this.createTime = EnvironmentEdgeManager.currentTime(); this.whenToExpire = this.createTime; + this.selectiveFlushRequest = selectiveFlushRequest; } /** @@ -660,6 +665,13 @@ class MemStoreFlusher implements FlushRequester { } /** + * @return Is this a request for a selective flush? + */ + public boolean isSelectiveFlushRequest() { + return selectiveFlushRequest; + } + + /** * @param when When to expire, when to come up out of the queue. * Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime() * to whatever you pass. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index ef0f456..761f6fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1069,7 +1069,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, LOG.info("Flushing " + region.getRegionNameAsString()); boolean shouldFlush = true; if (request.hasIfOlderThanTs()) { - shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs(); + shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); } FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); if (shouldFlush) { @@ -1080,7 +1080,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } builder.setFlushed(result); } - builder.setLastFlushTime(region.getLastFlushTime()); + builder.setLastFlushTime( region.getEarliestFlushTimeForAllStores()); return builder.build(); } catch (DroppedSnapshotException ex) { // Cache flush can fail in a few places. If it fails in a critical diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index e6f4d66..ebd4434 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -293,9 +293,9 @@ class FSHLog implements HLog, Syncable { /** * This lock ties all operations on oldestFlushingRegionSequenceIds and * oldestFlushedRegionSequenceIds Maps with the exception of append's putIfAbsent call into - * oldestUnflushedSeqNums. We use these Maps to find out the low bound regions sequence id, or - * to find regions with old sequence ids to force flush; we are interested in old stuff not the - * new additions (TODO: IS THIS SAFE? CHECK!). + * oldestUnflushedRegionSequenceIds. We use these Maps to find out the low bound regions + * sequence id, or to find regions with old sequence ids to force flush; we are interested in + * old stuff not the new additions (TODO: IS THIS SAFE? CHECK!). */ private final Object regionSequenceIdLock = new Object(); @@ -714,13 +714,14 @@ class FSHLog implements HLog, Syncable { * @throws IOException */ private void cleanOldLogs() throws IOException { - Map oldestFlushingSeqNumsLocal = null; - Map oldestUnflushedSeqNumsLocal = null; + Map lowestFlushingRegionSequenceIdsLocal = null; + Map oldestUnflushedRegionSequenceIdsLocal = null; List logsToArchive = new ArrayList(); // make a local copy so as to avoid locking when we iterate over these maps. synchronized (regionSequenceIdLock) { - oldestFlushingSeqNumsLocal = new HashMap(this.lowestFlushingRegionSequenceIds); - oldestUnflushedSeqNumsLocal = + lowestFlushingRegionSequenceIdsLocal = + new HashMap(this.lowestFlushingRegionSequenceIds); + oldestUnflushedRegionSequenceIdsLocal = new HashMap(this.oldestUnflushedRegionSequenceIds); } for (Map.Entry> e : byWalRegionSequenceIds.entrySet()) { @@ -728,8 +729,8 @@ class FSHLog implements HLog, Syncable { Path log = e.getKey(); Map sequenceNums = e.getValue(); // iterate over the map for this log file, and tell whether it should be archive or not. - if (areAllRegionsFlushed(sequenceNums, oldestFlushingSeqNumsLocal, - oldestUnflushedSeqNumsLocal)) { + if (areAllRegionsFlushed(sequenceNums, lowestFlushingRegionSequenceIdsLocal, + oldestUnflushedRegionSequenceIdsLocal)) { logsToArchive.add(log); LOG.debug("WAL file ready for archiving " + log); } @@ -1578,20 +1579,47 @@ class FSHLog implements HLog, Syncable { } @Override - public boolean startCacheFlush(final byte[] encodedRegionName) { + public long startCacheFlush(final byte[] encodedRegionName, long oldestSeqIdInStoresToFlush, + long oldestSeqIdInStoresNotToFlush, AtomicLong sequenceId) throws IOException { + long flushSeqId; Long oldRegionSeqNum = null; if (!closeBarrier.beginOp()) { LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) + " - because the server is closing."); - return false; + return NO_SEQUENCE_ID; } synchronized (regionSequenceIdLock) { - oldRegionSeqNum = this.oldestUnflushedRegionSequenceIds.remove(encodedRegionName); - if (oldRegionSeqNum != null) { - Long oldValue = - this.lowestFlushingRegionSequenceIds.put(encodedRegionName, oldRegionSeqNum); - assert oldValue == - null : "Flushing map not cleaned up for " + Bytes.toString(encodedRegionName); + if (oldestSeqIdInStoresNotToFlush == Long.MAX_VALUE) { + oldRegionSeqNum = this.oldestUnflushedRegionSequenceIds.remove(encodedRegionName); + if (oldRegionSeqNum != null) { + Long oldValue = this.lowestFlushingRegionSequenceIds.put(encodedRegionName, + oldRegionSeqNum); + assert oldValue == null: "Flushing map not cleaned up for " + + Bytes.toString(encodedRegionName); + } + // Get a sequence id that we can use to denote the flush. It will be one beyond the last + // edit that made it into the hfile (the below does not add an edit, it just asks the + // WAL system to return next sequence edit). + HLogKey key = new HLogKey(encodedRegionName, null); + appendNoSync(null, null, key, WALEdit.EMPTY_WALEDIT, sequenceId, false, null); + flushSeqId = key.getSequenceId(); + } else { + // Amongst the Stores not being flushed, what is the smallest sequence + // number? Put that in this map. + oldRegionSeqNum = this.oldestUnflushedRegionSequenceIds.replace(encodedRegionName, + oldestSeqIdInStoresNotToFlush); + + // Amongst the Stores being flushed, what is the smallest sequence + // number? Put that in this map. + this.lowestFlushingRegionSequenceIds.put(encodedRegionName, oldestSeqIdInStoresToFlush); + + // During Log Replay, we can safely discard any edits that have + // the sequence number less than the smallest sequence id amongst the + // stores that we are not flushing. This might re-apply some edits + // which belonged to stores which are going to be flushed, but we + // expect these operations to be idempotent anyways, and this is + // simpler. + flushSeqId = oldestSeqIdInStoresNotToFlush; } } if (oldRegionSeqNum == null) { @@ -1603,7 +1631,7 @@ class FSHLog implements HLog, Syncable { LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: [" + Bytes.toString(encodedRegionName) + "]"); } - return true; + return flushSeqId; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index eb3692e..b9fc204 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -404,10 +404,11 @@ public interface HLog { * In case of flush being aborted, we put the stashed value back; in case of flush succeeding, * the seqNum of that first edit after start becomes the valid oldest seqNum for this region. * - * @return true if the flush can proceed, false in case wal is closing (ususally, when server is - * closing) and flush couldn't be started. + * @return the flushSeqId of the flush if the can proceed, {@link #NO_SEQUENCE_ID} in case wal + * is closing (usually, when server is closing) and flush couldn't be started. */ - boolean startCacheFlush(final byte[] encodedRegionName); + long startCacheFlush(final byte[] encodedRegionName, long oldestSeqIdInStoresToFlush, + long oldestSeqIdInStoresNotToFlush, AtomicLong sequenceId) throws IOException; /** * Complete the cache flush. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index be6870c..001488c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -271,7 +271,7 @@ public class TestIOFencing { compactingRegion = (CompactionBlockerRegion)testRegions.get(0); LOG.info("Blocking compactions"); compactingRegion.stopCompactions(); - long lastFlushTime = compactingRegion.getLastFlushTime(); + long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores(); // Load some rows TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); @@ -287,7 +287,7 @@ public class TestIOFencing { // Wait till flush has happened, otherwise there won't be multiple store files long startWaitTime = System.currentTimeMillis(); - while (compactingRegion.getLastFlushTime() <= lastFlushTime || + while (compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime || compactingRegion.countStoreFiles() <= 1) { LOG.info("Waiting for the region to flush " + compactingRegion.getRegionNameAsString()); Thread.sleep(1000); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java new file mode 100644 index 0000000..91a1909 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java @@ -0,0 +1,626 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; +import org.apache.hadoop.hbase.regionserver.DefaultMemStore; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.hash.Hashing; + +/** + * This test verifies the correctness of the Per Column Family flushing strategy + */ +@Category(MediumTests.class) +public class TestPerColumnFamilyFlush { + private static final Log LOG = LogFactory.getLog(TestPerColumnFamilyFlush.class); + + HRegion region = null; + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final String DIR = TEST_UTIL.getClusterTestDir() + "/TestHRegion/"; + + public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1"); + + public static final byte[][] families = { + Bytes.toBytes("f1"), Bytes.toBytes("f2"), Bytes.toBytes("f3"), Bytes.toBytes("f4"), + Bytes.toBytes("f5") + }; + + public static final byte[] FAMILY1 = families[0]; + + public static final byte[] FAMILY2 = families[1]; + + public static final byte[] FAMILY3 = families[2]; + + private void initHRegion(String callingMethod, Configuration conf) throws IOException { + HTableDescriptor htd = new HTableDescriptor(TABLENAME); + for (byte[] family: families) { + htd.addFamily(new HColumnDescriptor(family)); + } + HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false); + Path path = new Path(DIR + callingMethod); + region = HRegion.createHRegion(info, path, conf, htd); + } + + // A helper function to create puts. + private Put createPut(int familyNum, int putNum) { + byte[] qf = Bytes.toBytes("q" + familyNum); + byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); + byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); + Put p = new Put(row); + p.add(families[familyNum - 1], qf, val); + return p; + } + + // A helper function to create puts. + private Get createGet(int familyNum, int putNum) { + byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); + return new Get(row); + } + + // A helper function to verify edits. + void verifyEdit(int familyNum, int putNum, HTable table) throws IOException { + Result r = table.get(createGet(familyNum, putNum)); + byte[] family = families[familyNum - 1]; + byte[] qf = Bytes.toBytes("q" + familyNum); + byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); + assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family)); + assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), + r.getFamilyMap(family).get(qf)); + assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum), + Arrays.equals(r.getFamilyMap(family).get(qf), val)); + } + + @Test + public void testSelectiveFlushWhenEnabled() throws IOException { + // Set up the configuration + Configuration conf = HBaseConfiguration.create(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); + conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, true); + conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 100 * 1024); + // Intialize the HRegion + initHRegion("testSelectiveFlushWhenEnabled", conf); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); + + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + region.put(createPut(3, i)); + } + } + } + + long totalMemstoreSize = region.getMemstoreSize().get(); + + // Find the smallest LSNs for edits wrt to each CF. + long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3); + + // Find the sizes of the memstores of each CF. + long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + + // Get the overall smallest LSN in the region's memstores. + long smallestSeqInRegionCurrentMemstore = region.getLog().getEarliestMemstoreSeqNum( + region.getRegionInfo().getEncodedNameAsBytes()); + + // The overall smallest LSN in the region's memstores should be the same as + // the LSN of the smallest edit in CF1 + assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore); + + // Some other sanity checks. + assertTrue(smallestSeqCF1 < smallestSeqCF2); + assertTrue(smallestSeqCF2 < smallestSeqCF3); + assertTrue(cf1MemstoreSize > 0); + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. + assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize + + cf2MemstoreSize + cf3MemstoreSize); + + // Flush! + region.flushcache(true); + + // Will use these to check if anything changed. + long oldCF2MemstoreSize = cf2MemstoreSize; + long oldCF3MemstoreSize = cf3MemstoreSize; + + // Recalculate everything + cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + totalMemstoreSize = region.getMemstoreSize().get(); + smallestSeqInRegionCurrentMemstore = region.getLog().getEarliestMemstoreSeqNum( + region.getRegionInfo().getEncodedNameAsBytes()); + + // We should have cleared out only CF1, since we chose the flush thresholds + // and number of puts accordingly. + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); + // Nothing should have happened to CF2, ... + assertEquals(cf2MemstoreSize, oldCF2MemstoreSize); + // ... or CF3 + assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); + // Now the smallest LSN in the region should be the same as the smallest + // LSN in the memstore of CF2. + assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2); + // Of course, this should hold too. + assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize + + cf3MemstoreSize); + + // Now add more puts (mostly for CF2), so that we only flush CF2 this time. + for (int i = 1200; i < 2400; i++) { + region.put(createPut(2, i)); + + // Add only 100 puts for CF3 + if (i - 1200 < 100) { + region.put(createPut(3, i)); + } + } + + // How much does the CF3 memstore occupy? Will be used later. + oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + + // Flush again + region.flushcache(true); + + // Recalculate everything + cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + totalMemstoreSize = region.getMemstoreSize().get(); + smallestSeqInRegionCurrentMemstore = region.getLog().getEarliestMemstoreSeqNum( + region.getRegionInfo().getEncodedNameAsBytes()); + + // CF1 and CF2, both should be absent. + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize); + // CF3 shouldn't have been touched. + assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); + assertEquals(totalMemstoreSize + DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize); + assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3); + + // What happens when we hit the memstore limit, but we are not able to find + // any Column Family above the threshold? + // In that case, we should flush all the CFs. + + // Clearing the existing memstores. + region.flushcache(false); + + // The memstore limit is 200*1024 and the column family flush threshold is + // around 50*1024. We try to just hit the memstore limit with each CF's + // memstore being below the CF flush threshold. + for (int i = 1; i <= 300; i++) { + region.put(createPut(1, i)); + region.put(createPut(2, i)); + region.put(createPut(3, i)); + region.put(createPut(4, i)); + region.put(createPut(5, i)); + } + + region.flushcache(true); + // Since we won't find any CF above the threshold, and hence no specific + // store to flush, we should flush all the memstores. + assertEquals(0, region.getMemstoreSize().get()); + } + + @Test + public void testSelectiveFlushWhenNotEnabled() throws IOException { + // Set up the configuration + Configuration conf = HBaseConfiguration.create(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); + conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, false); + conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 100 * 1024); + + // Intialize the HRegion + initHRegion("testSelectiveFlushWhenNotEnabled", conf); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + region.put(createPut(3, i)); + } + } + } + + long totalMemstoreSize = region.getMemstoreSize().get(); + + // Find the sizes of the memstores of each CF. + long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + + // Some other sanity checks. + assertTrue(cf1MemstoreSize > 0); + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. + assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize + + cf2MemstoreSize + cf3MemstoreSize); + + // Flush! + region.flushcache(true); + + cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + totalMemstoreSize = region.getMemstoreSize().get(); + long smallestSeqInRegionCurrentMemstore = region.getLog().getEarliestMemstoreSeqNum( + region.getRegionInfo().getEncodedNameAsBytes()); + + // Everything should have been cleared + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize); + assertEquals(0, totalMemstoreSize); + assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore); + } + + // Find the (first) region which has the specified name. + private static Pair getRegionWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (HRegion region: hrs.getOnlineRegions(tableName)) { + return Pair.newPair(region, hrs); + } + } + return null; + } + + @Test + public void testLogReplay() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, true); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000); + // Carefully chosen limits so that the memstore just flushes when we're done + conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 10000); + final int numRegionServers = 4; + TEST_UTIL.startMiniCluster(numRegionServers); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + HTable table = TEST_UTIL.createTable(TABLENAME, families); + HTableDescriptor htd = table.getTableDescriptor(); + + for (byte[] family: families) { + if (!htd.hasFamily(family)) { + htd.addFamily(new HColumnDescriptor(family)); + } + } + + // Add 100 edits for CF1, 20 for CF2, 20 for CF3. + // These will all be interleaved in the log. + for (int i = 1; i <= 80; i++) { + table.put(createPut(1, i)); + if (i <= 10) { + table.put(createPut(2, i)); + table.put(createPut(3, i)); + } + } + table.flushCommits(); + Thread.sleep(1000); + + Pair desiredRegionAndServer = getRegionWithName(TABLENAME); + HRegion desiredRegion = desiredRegionAndServer.getFirst(); + assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); + + // Flush the region selectively. + desiredRegion.flushcache(true); + + long totalMemstoreSize; + long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize; + totalMemstoreSize = desiredRegion.getMemstoreSize().get(); + + // Find the sizes of the memstores of each CF. + cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize(); + + // CF1 Should have been flushed + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); + // CF2 and CF3 shouldn't have been flushed. + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize + + cf3MemstoreSize); + + // Wait for the RS report to go across to the master, so that the master + // is aware of which sequence ids have been flushed, before we kill the RS. + // If in production, the RS dies before the report goes across, we will + // safely replay all the edits. + Thread.sleep(2000); + + // Abort the region server where we have the region hosted. + HRegionServer rs = desiredRegionAndServer.getSecond(); + rs.abort("testing"); + + // The aborted region server's regions will be eventually assigned to some + // other region server, and the get RPC call (inside verifyEdit()) will + // retry for some time till the regions come back up. + + // Verify that all the edits are safe. + for (int i = 1; i <= 80; i++) { + verifyEdit(1, i, table); + if (i <= 10) { + verifyEdit(2, i, table); + verifyEdit(3, i, table); + } + } + + TEST_UTIL.shutdownMiniCluster(); + } + + // Test Log Replay with Distributed Replay on. + // In distributed log replay, the log splitters ask the master for the + // last flushed sequence id for a region. This test would ensure that we + // are doing the book-keeping correctly. + @Test + public void testLogReplayWithDistributedReplay() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + testLogReplay(); + } + + /** + * When a log roll is about to happen, we do a flush of the regions who will + * be affected by the log roll. These flushes cannot be a selective flushes, + * otherwise we cannot roll the logs. This test ensures that we do a + * full-flush in that scenario. + * @throws IOException + */ + @Test + public void testFlushingWhenLogRolling() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, true); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300000); + conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 100000); + + // Also, let us try real hard to get a log roll to happen. + // Keeping the log roll period to 2s. + conf.setLong("hbase.regionserver.logroll.period", 2000); + // Keep the block size small so that we fill up the log files very fast. + conf.setLong("hbase.regionserver.hlog.blocksize", 6144); + int maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); + + final int numRegionServers = 4; + TEST_UTIL.startMiniCluster(numRegionServers); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + HTable table = TEST_UTIL.createTable(TABLENAME, families); + HTableDescriptor htd = table.getTableDescriptor(); + + for (byte[] family: families) { + if (!htd.hasFamily(family)) { + htd.addFamily(new HColumnDescriptor(family)); + } + } + + HRegion desiredRegion = getRegionWithName(TABLENAME).getFirst(); + assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); + + // Add some edits. Most will be for CF1, some for CF2 and CF3. + for (int i = 1; i <= 10000; i++) { + table.put(createPut(1, i)); + if (i <= 200) { + table.put(createPut(2, i)); + table.put(createPut(3, i)); + } + table.flushCommits(); + // Keep adding until we exceed the number of log files, so that we are + // able to trigger the cleaning of old log files. + int currentNumLogFiles = desiredRegion.getLog().getNumLogFiles(); + if (currentNumLogFiles > maxLogs) { + LOG.info("The number of log files is now: " + currentNumLogFiles + + ". Expect a log roll and memstore flush."); + break; + } + } + table.close(); + // Wait for some time till the flush caused by log rolling happens. + Thread.sleep(4000); + + // We have artificially created the conditions for a log roll. When a + // log roll happens, we should flush all the column families. Testing that + // case here. + + // Individual families should have been flushed. + assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY1).getMemStoreSize()); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY2).getMemStoreSize()); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY3).getMemStoreSize()); + + // And of course, the total memstore should also be clean. + assertEquals(0, desiredRegion.getMemstoreSize().get()); + + TEST_UTIL.shutdownMiniCluster(); + } + + private void doPut(HTableInterface table) throws IOException { + // cf1 4B per row, cf2 40B per row and cf3 400B per row + byte[] qf = Bytes.toBytes("qf"); + Random rand = new Random(); + byte[] value1 = new byte[100]; + byte[] value2 = new byte[200]; + byte[] value3 = new byte[400]; + for (int i = 0; i < 10000; i++) { + Put put = new Put(Bytes.toBytes("row-" + i)); + rand.setSeed(i); + rand.nextBytes(value1); + rand.nextBytes(value2); + rand.nextBytes(value3); + put.add(FAMILY1, qf, value1); + put.add(FAMILY2, qf, value2); + put.add(FAMILY3, qf, value3); + table.put(put); + } + } + + @Test + public void testCompareStoreFileCount() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); + conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 400 * 1024); + conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, false); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + ConstantSizeRegionSplitPolicy.class.getName()); + + HTableDescriptor htd = new HTableDescriptor(TABLENAME); + htd.setCompactionEnabled(false); + htd.addFamily(new HColumnDescriptor(FAMILY1)); + htd.addFamily(new HColumnDescriptor(FAMILY2)); + htd.addFamily(new HColumnDescriptor(FAMILY3)); + + LOG.info("==============Test with selective flush disabled==============="); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + TEST_UTIL.getHBaseAdmin().createTable(htd); + getRegionWithName(TABLENAME).getFirst(); + HConnection conn = HConnectionManager.createConnection(conf); + HTableInterface table = conn.getTable(TABLENAME); + doPut(table); + table.close(); + conn.close(); + + HRegion region = getRegionWithName(TABLENAME).getFirst(); + int cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount(); + int cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount(); + int cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount(); + TEST_UTIL.shutdownMiniCluster(); + + LOG.info("==============Test with selective flush enabled==============="); + conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, true); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + TEST_UTIL.getHBaseAdmin().createTable(htd); + conn = HConnectionManager.createConnection(conf); + table = conn.getTable(TABLENAME); + doPut(table); + table.close(); + conn.close(); + + region = getRegionWithName(TABLENAME).getFirst(); + int cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount(); + int cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount(); + int cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount(); + TEST_UTIL.shutdownMiniCluster(); + + LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount + + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", " + + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount); + LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1 + + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", " + + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount1); + // small CF will have less store files. + assertTrue(cf1StoreFileCount1 < cf1StoreFileCount); + assertTrue(cf2StoreFileCount1 < cf2StoreFileCount); + } + + public static void main(String[] args) throws Exception { + int numRegions = Integer.parseInt(args[0]); + long numRows = Long.parseLong(args[1]); + + HTableDescriptor htd = new HTableDescriptor(TABLENAME); + htd.setMaxFileSize(10L * 1024 * 1024 * 1024); + htd.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName()); + htd.addFamily(new HColumnDescriptor(FAMILY1)); + htd.addFamily(new HColumnDescriptor(FAMILY2)); + htd.addFamily(new HColumnDescriptor(FAMILY3)); + + Configuration conf = HBaseConfiguration.create(); + HConnection conn = HConnectionManager.createConnection(conf); + HBaseAdmin admin = new HBaseAdmin(conn); + if (admin.tableExists(TABLENAME)) { + admin.disableTable(TABLENAME); + admin.deleteTable(TABLENAME); + } + if (numRegions >= 3) { + byte[] startKey = new byte[16]; + byte[] endKey = new byte[16]; + Arrays.fill(endKey, (byte) 0xFF); + admin.createTable(htd, startKey, endKey, numRegions); + } else { + admin.createTable(htd); + } + admin.close(); + + HTableInterface table = conn.getTable(TABLENAME); + byte[] qf = Bytes.toBytes("qf"); + Random rand = new Random(); + byte[] value1 = new byte[16]; + byte[] value2 = new byte[256]; + byte[] value3 = new byte[4096]; + for (long i = 0; i < numRows; i++) { + Put put = new Put(Hashing.md5().hashLong(i).asBytes()); + rand.setSeed(i); + rand.nextBytes(value1); + rand.nextBytes(value2); + rand.nextBytes(value3); + put.add(FAMILY1, qf, value1); + put.add(FAMILY2, qf, value2); + put.add(FAMILY3, qf, value3); + table.put(put); + if (i % 10000 == 0) { + LOG.info(i + " rows put"); + } + } + table.close(); + conn.close(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java index 33c5c8f..ad1f9d8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java @@ -33,8 +33,8 @@ public class TestFlushRegionEntry { @Test public void test() { - FlushRegionEntry entry = new FlushRegionEntry(Mockito.mock(HRegion.class)); - FlushRegionEntry other = new FlushRegionEntry(Mockito.mock(HRegion.class)); + FlushRegionEntry entry = new FlushRegionEntry(Mockito.mock(HRegion.class), true); + FlushRegionEntry other = new FlushRegionEntry(Mockito.mock(HRegion.class), true); assertEquals(entry.hashCode(), other.hashCode()); assertEquals(entry, other); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index 871d18e..ac21782 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -112,11 +112,11 @@ public class TestHeapMemoryManager { long oldBlockCacheSize = blockCache.maxSize; heapMemoryManager.start(); memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; - memStoreFlusher.requestFlush(null); - memStoreFlusher.requestFlush(null); - memStoreFlusher.requestFlush(null); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; - memStoreFlusher.requestFlush(null); + memStoreFlusher.requestFlush(null, false); Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); @@ -126,8 +126,8 @@ public class TestHeapMemoryManager { oldBlockCacheSize = blockCache.maxSize; // Do some more flushes before the next run of HeapMemoryTuner memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; - memStoreFlusher.requestFlush(null); - memStoreFlusher.requestFlush(null); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); Thread.sleep(1500); assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); @@ -407,12 +407,12 @@ public class TestHeapMemoryManager { } @Override - public void requestFlush(HRegion region) { + public void requestFlush(HRegion region, boolean selectiveFlushRequest) { this.listener.flushRequested(flushType, region); } @Override - public void requestDelayedFlush(HRegion region, long delay) { + public void requestDelayedFlush(HRegion region, long delay, boolean selectiveFlushRequest) { } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 7d71fa9..3b0c546 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -609,7 +609,7 @@ public class TestHLog { htd.addFamily(new HColumnDescriptor("column")); log.append(info, tableName, cols, System.currentTimeMillis(), htd, sequenceId); - log.startCacheFlush(info.getEncodedNameAsBytes()); + log.startCacheFlush(info.getEncodedNameAsBytes(), Long.MAX_VALUE, Long.MAX_VALUE, sequenceId); log.completeCacheFlush(info.getEncodedNameAsBytes()); log.close(); Path filename = ((FSHLog) log).computeFilename(); @@ -667,7 +667,7 @@ public class TestHLog { HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor("column")); log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId); - log.startCacheFlush(hri.getEncodedNameAsBytes()); + log.startCacheFlush(hri.getEncodedNameAsBytes(), Long.MAX_VALUE, Long.MAX_VALUE, sequenceId); log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.close(); Path filename = ((FSHLog) log).computeFilename(); @@ -776,7 +776,7 @@ public class TestHLog { // Flush the first region, we expect to see the first two files getting // archived. We need to append something or writer won't be rolled. addEdits(log, hri2, tableName2, 1, sequenceId); - log.startCacheFlush(hri.getEncodedNameAsBytes()); + log.startCacheFlush(hri.getEncodedNameAsBytes(), Long.MAX_VALUE, Long.MAX_VALUE, sequenceId); log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.rollWriter(); assertEquals(2, ((FSHLog) log).getNumRolledLogFiles()); @@ -785,7 +785,7 @@ public class TestHLog { // since the oldest was completely flushed and the two others only contain // flush information addEdits(log, hri2, tableName2, 1, sequenceId); - log.startCacheFlush(hri2.getEncodedNameAsBytes()); + log.startCacheFlush(hri2.getEncodedNameAsBytes(), Long.MAX_VALUE, Long.MAX_VALUE, sequenceId); log.completeCacheFlush(hri2.getEncodedNameAsBytes()); log.rollWriter(); assertEquals(0, ((FSHLog) log).getNumRolledLogFiles()); @@ -1118,7 +1118,7 @@ public class TestHLog { assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles()); // add a waledit to table1, and flush the region. addEdits(hlog, hri1, table1, 3, sequenceId1); - flushRegion(hlog, hri1.getEncodedNameAsBytes()); + flushRegion(hlog, hri1.getEncodedNameAsBytes(), sequenceId1); // roll log; all old logs should be archived. hlog.rollWriter(); assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles()); @@ -1132,7 +1132,7 @@ public class TestHLog { assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles()); // add edits for table2, and flush hri1. addEdits(hlog, hri2, table2, 2, sequenceId2); - flushRegion(hlog, hri1.getEncodedNameAsBytes()); + flushRegion(hlog, hri1.getEncodedNameAsBytes(), sequenceId1); // the log : region-sequenceId map is // log1: region2 (unflushed) // log2: region1 (flushed) @@ -1142,7 +1142,7 @@ public class TestHLog { assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles()); // flush region2, and all logs should be archived. addEdits(hlog, hri2, table2, 2, sequenceId2); - flushRegion(hlog, hri2.getEncodedNameAsBytes()); + flushRegion(hlog, hri2.getEncodedNameAsBytes(), sequenceId2); hlog.rollWriter(); assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles()); } finally { @@ -1195,12 +1195,12 @@ public class TestHLog { assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]); // flush region 1, and roll the wal file. Only last wal which has entries for region1 should // remain. - flushRegion(hlog, hri1.getEncodedNameAsBytes()); + flushRegion(hlog, hri1.getEncodedNameAsBytes(), sequenceId1); hlog.rollWriter(); // only one wal should remain now (that is for the second region). assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles()); // flush the second region - flushRegion(hlog, hri2.getEncodedNameAsBytes()); + flushRegion(hlog, hri2.getEncodedNameAsBytes(), sequenceId2); hlog.rollWriter(true); // no wal should remain now. assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles()); @@ -1217,14 +1217,15 @@ public class TestHLog { regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush(); assertEquals(2, regionsToFlush.length); // flush both regions - flushRegion(hlog, hri1.getEncodedNameAsBytes()); - flushRegion(hlog, hri2.getEncodedNameAsBytes()); + flushRegion(hlog, hri1.getEncodedNameAsBytes(), sequenceId1); + flushRegion(hlog, hri2.getEncodedNameAsBytes(), sequenceId2); hlog.rollWriter(true); assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles()); // Add an edit to region1, and roll the wal. addEdits(hlog, hri1, t1, 2, sequenceId1); // tests partial flush: roll on a partial flush, and ensure that wal is not archived. - hlog.startCacheFlush(hri1.getEncodedNameAsBytes()); + hlog.startCacheFlush(hri1.getEncodedNameAsBytes(), Long.MAX_VALUE, Long.MAX_VALUE, + sequenceId1); hlog.rollWriter(); hlog.completeCacheFlush(hri1.getEncodedNameAsBytes()); assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles()); @@ -1284,9 +1285,11 @@ public class TestHLog { * helper method to simulate region flush for a WAL. * @param hlog * @param regionEncodedName + * @throws IOException */ - private void flushRegion(HLog hlog, byte[] regionEncodedName) { - hlog.startCacheFlush(regionEncodedName); + private void flushRegion(HLog hlog, byte[] regionEncodedName, + AtomicLong sequenceId) throws IOException { + hlog.startCacheFlush(regionEncodedName, Long.MAX_VALUE, Long.MAX_VALUE, sequenceId); hlog.completeCacheFlush(regionEncodedName); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 7e129e2..d623ea5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -780,7 +781,7 @@ public class TestWALReplay { } // Add a cache flush, shouldn't have any effect - wal.startCacheFlush(regionName); + wal.startCacheFlush(regionName, Long.MAX_VALUE, Long.MAX_VALUE, sequenceId); wal.completeCacheFlush(regionName); // Add an edit to another family, should be skipped. @@ -822,10 +823,10 @@ public class TestWALReplay { new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { @Override protected FlushResult internalFlushcache( - final HLog wal, final long myseqid, MonitoredTask status) + final HLog wal, final long myseqid, Collection storesToFlush, MonitoredTask status) throws IOException { LOG.info("InternalFlushCache Invoked"); - FlushResult fs = super.internalFlushcache(wal, myseqid, + FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush, Mockito.mock(MonitoredTask.class)); flushcount.incrementAndGet(); return fs; @@ -937,7 +938,7 @@ public class TestWALReplay { private HRegion r; @Override - public void requestFlush(HRegion region) { + public void requestFlush(HRegion region, boolean selectiveFlushRequest) { try { r.flushcache(); } catch (IOException e) { @@ -946,7 +947,7 @@ public class TestWALReplay { } @Override - public void requestDelayedFlush(HRegion region, long when) { + public void requestDelayedFlush(HRegion region, long when, boolean selectiveFlushRequest) { // TODO Auto-generated method stub } -- 1.9.1