From b90cf0c2de33ebc0511b62efe196f22906ae60a9 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 13 Oct 2014 12:19:34 +0800 Subject: [PATCH] MOD: HBASE-10201 --- .../org/apache/hadoop/hbase/HTableDescriptor.java | 3 + .../java/org/apache/hadoop/hbase/HConstants.java | 15 + hbase-common/src/main/resources/hbase-default.xml | 13 + .../hadoop/hbase/regionserver/FlushRequester.java | 6 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 312 ++++++++---- .../hadoop/hbase/regionserver/HRegionServer.java | 4 +- .../apache/hadoop/hbase/regionserver/HStore.java | 30 +- .../hadoop/hbase/regionserver/LogRoller.java | 5 +- .../apache/hadoop/hbase/regionserver/MemStore.java | 83 +++- .../hadoop/hbase/regionserver/MemStoreFlusher.java | 41 +- .../apache/hadoop/hbase/regionserver/Store.java | 12 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 54 ++- .../apache/hadoop/hbase/regionserver/wal/HLog.java | 41 +- .../hadoop/hbase/regionserver/wal/HLogUtil.java | 2 +- .../org/apache/hadoop/hbase/TestIOFencing.java | 4 +- .../hadoop/hbase/TestPerColumnFamilyFlush.java | 528 +++++++++++++++++++++ .../hadoop/hbase/regionserver/TestMemStore.java | 4 +- .../hadoop/hbase/regionserver/TestStore.java | 97 ++-- .../hadoop/hbase/regionserver/wal/FaultyHLog.java | 2 +- .../hadoop/hbase/regionserver/wal/TestHLog.java | 28 +- .../hbase/regionserver/wal/TestWALReplay.java | 10 +- 21 files changed, 1079 insertions(+), 215 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index c32ad56..22ec156 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -213,6 +213,9 @@ public class HTableDescriptor implements WritableComparable { * the contents are flushed to the store files */ public static final long DEFAULT_MEMSTORE_FLUSH_SIZE = 1024*1024*128L; + + public static final long DEFAULT_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE = + 1024*1024*16L; private final static Map DEFAULT_VALUES = new HashMap(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 5eddd7f..a58ff48 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -328,6 +328,21 @@ public final class HConstants { /** Conf key for the memstore size at which we flush the memstore */ public static final String HREGION_MEMSTORE_FLUSH_SIZE = "hbase.hregion.memstore.flush.size"; + + /** Conf key for enabling Per Column Family flushing of memstores */ + public static final String HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH = + "hbase.hregion.memstore.percolumnfamilyflush.enabled"; + + /** Default value for the Per Column Family flush knob */ + public static final Boolean DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH = + false; + + /** + * If Per Column Family flushing is enabled, this is the minimum size + * at which a column family's memstore is flushed. + */ + public static final String HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE = + "hbase.hregion.memstore.percolumnfamilyflush.flush.size"; public static final String HREGION_EDITS_REPLAY_SKIP_ERRORS = "hbase.hregion.edits.replay.skip.errors"; diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index f285cf7..ceba6a5 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -569,6 +569,19 @@ possible configurations would overwhelm and obscure the important. every hbase.server.thread.wakefrequency. + hbase.columnfamily.memstore.flush.size + 16777216 + + If per column family flushing is turned on, then every time that we hit the + total memstore limit, we find out all the column families whose memstores + exceed this value, and only flush them, while retaining the others whose + memstores are lower than this limit. If none of the families have their + memstore size more than this, all the memstores will be flushed + (just as usual). This value should less than half of the total memstore + threshold (hbase.hregion.memstore.flush.size). + + + hbase.hregion.preclose.flush.size 5242880 diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java index d43a087..8d869c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java @@ -30,8 +30,12 @@ public interface FlushRequester { * Tell the listener the cache needs to be flushed. * * @param region the HRegion requesting the cache flush + * @param selectiveFlushRequest is this a selective flush request? This means + * that if some column families are dominating + * the memstore size, only those column families + * would be flushed. */ - void requestFlush(HRegion region); + void requestFlush(HRegion region, boolean selectiveFlushRequest); /** * Tell the listener the cache needs to be flushed after a delay * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 313baca..eb6d938 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -471,7 +472,13 @@ public class HRegion implements HeapSize { // , Writable{ long memstoreFlushSize; final long timestampSlop; final long rowProcessorTimeout; - private volatile long lastFlushTime; + // The maximum size a column family's memstore can grow up to, + // before being flushed. + long columnfamilyMemstoreFlushSize; + // Last flush time for each Store. Useful when we are flushing for each column + private Map lastStoreFlushTimeMap = new ConcurrentHashMap(); + // Selective flushing of Column Families which dominate the memstore? + final boolean perColumnFamilyFlushEnabled; final RegionServerServices rsServices; private RegionServerAccounting rsAccounting; private List> recentFlushes = new ArrayList>(); @@ -576,7 +583,11 @@ public class HRegion implements HeapSize { // , Writable{ throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed " + MAX_FLUSH_PER_CHANGES); } - + this.columnfamilyMemstoreFlushSize = 0L; + this.perColumnFamilyFlushEnabled = conf.getBoolean( + HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, + HConstants.DEFAULT_HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH); + LOG.debug("Per Column Family Flushing: " + perColumnFamilyFlushEnabled); this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); @@ -657,6 +668,9 @@ public class HRegion implements HeapSize { // , Writable{ this.memstoreFlushSize = flushSize; this.blockingMemStoreSize = this.memstoreFlushSize * conf.getLong("hbase.hregion.memstore.block.multiplier", 2); + this.columnfamilyMemstoreFlushSize = conf.getLong( + HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, + HTableDescriptor.DEFAULT_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE); } /** @@ -728,7 +742,10 @@ public class HRegion implements HeapSize { // , Writable{ // Initialize split policy this.splitPolicy = RegionSplitPolicy.create(this, conf); - this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + for (Store store : stores.values()) { + this.lastStoreFlushTimeMap.put(store, startTime); + } // Use maximum of log sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). long nextSeqid = maxSeqId + 1; @@ -1343,9 +1360,22 @@ public class HRegion implements HeapSize { // , Writable{ return this.fs; } - /** @return the last time the region was flushed */ - public long getLastFlushTime() { - return this.lastFlushTime; + /** + * @return Returns the earliest time a store in the region + * was flushed. All other stores in the region would + * have been flushed either at, or after this time. + */ + public long getMinFlushTimeForAllStores() { + return Collections.min(this.lastStoreFlushTimeMap.values()); + } + + /** + * Returns the last time a particular store was flushed + * @param store The store in question + * @return The last time this store was flushed + */ + public long getLastStoreFlushTime(Store store) { + return this.lastStoreFlushTimeMap.get(store); } ////////////////////////////////////////////////////////////////////////////// @@ -1509,6 +1539,16 @@ public class HRegion implements HeapSize { // , Writable{ } /** + * Flush the cache, while disabling selective flushing. + * + * @return + * @throws IOException + */ + public FlushResult flushcache() throws IOException { + return flushcache(false); + } + + /** * Flush the cache. * * When this method is called the cache will be flushed unless: @@ -1521,20 +1561,28 @@ public class HRegion implements HeapSize { // , Writable{ * *

This method may block for some time, so it should not be called from a * time-sensitive thread. - * + * @param selectiveFlushRequest If true, selectively flush column families + * which dominate the memstore size, provided it + * is enabled in the configuration. * @return true if the region needs compacting * * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of hlog is required * because a Snapshot was not properly persisted. */ - public FlushResult flushcache() throws IOException { + public FlushResult flushcache(boolean selectiveFlushRequest) throws IOException { // fail-fast instead of waiting on the lock if (this.closing.get()) { String msg = "Skipping flush on " + this + " because closing"; LOG.debug(msg); return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } + // If a selective flush was requested, but the per-column family switch is + // off, we cannot do a selective flush. + if (selectiveFlushRequest && !perColumnFamilyFlushEnabled) { + LOG.debug("Disabling selective flushing of Column Families' memstores."); + selectiveFlushRequest = false; + } MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); status.setStatus("Acquiring readlock on region"); // block waiting for the lock for flushing cache @@ -1570,8 +1618,37 @@ public class HRegion implements HeapSize { // , Writable{ return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } } + Collection specificStoresToFlush = null; try { - FlushResult fs = internalFlushcache(status); + // We now have to flush the memstore since it has + // reached the threshold, however, we might not need + // to flush the entire memstore. If there are certain + // column families that are dominating the memstore size, + // we will flush just those. The second behavior only + // happens when selectiveFlushRequest is true. + FlushResult fs; + + // If it is okay to flush the memstore by selecting the + // column families which dominate the size, we are going + // to populate the specificStoresToFlush set. + if (selectiveFlushRequest) { + specificStoresToFlush = new HashSet(); + for (Store store: stores.values()) { + if (shouldFlushStore(store)) { + specificStoresToFlush.add(store); + LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this + + " will be flushed"); + } + } + // Didn't find any CFs which were above the threshold for selection. + if (specificStoresToFlush.size() == 0) { + LOG.debug("Since none of the CFs were above the size, flushing all."); + specificStoresToFlush = stores.values(); + } + } else { + specificStoresToFlush = stores.values(); + } + fs = internalFlushcache(specificStoresToFlush, status); if (coprocessorHost != null) { status.setStatus("Running post-flush coprocessor hooks"); @@ -1607,7 +1684,7 @@ public class HRegion implements HeapSize { // , Writable{ } long now = EnvironmentEdgeManager.currentTimeMillis(); //if we flushed in the recent past, we don't need to do again now - if ((now - getLastFlushTime() < flushCheckInterval)) { + if ((now - getMinFlushTimeForAllStores() < flushCheckInterval)) { return false; } //since we didn't flush in the recent past, flush now if certain conditions @@ -1658,20 +1735,39 @@ public class HRegion implements HeapSize { // , Writable{ */ protected FlushResult internalFlushcache(MonitoredTask status) throws IOException { - return internalFlushcache(this.log, -1, status); + return internalFlushcache(this.log, -1, stores.values(), status); + } + + /** + * See {@link #internalFlushcache(org.apache.hadoop.hbase.monitoring.MonitoredTask)} + * + * @param storesToFlush The specific stores to flush. + * @param status + * @return + * @throws IOException + */ + protected FlushResult internalFlushcache(Collection storesToFlush, MonitoredTask status) + throws IOException { + return internalFlushcache(this.log, -1L, storesToFlush, status); + } + + protected FlushResult internalFlushcache(final HLog wal, final long myseqid, MonitoredTask status) + throws IOException { + return internalFlushcache(wal, myseqid, stores.values(), status); } /** * @param wal Null if we're NOT to go via hlog/wal. * @param myseqid The seqid to use if wal is null writing out * flush file. + * @param storesToFlush The list of stores to flush. * @param status - * @return true if the region needs compacting + * @return object describing the flush's state * @throws IOException * @see #internalFlushcache(MonitoredTask) */ protected FlushResult internalFlushcache( - final HLog wal, final long myseqid, MonitoredTask status) + final HLog wal, final long myseqid, Collection storesToFlush, MonitoredTask status) throws IOException { if (this.rsServices != null && this.rsServices.isAborted()) { // Don't flush when server aborting, it's unsafe @@ -1681,17 +1777,25 @@ public class HRegion implements HeapSize { // , Writable{ // Clear flush flag. // If nothing to flush, return and avoid logging start/stop flush. if (this.memstoreSize.get() <= 0) { + // Since there is nothing to flush, we will reset the flush times for all + // the stores. + for (Store store: stores.values()) { + lastStoreFlushTimeMap.put(store, startTime); + } if(LOG.isDebugEnabled()) { LOG.debug("Empty memstore size for the current region "+this); } return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush"); } - LOG.info("Started memstore flush for " + this + - ", current region memstore size " + - StringUtils.humanReadableInt(this.memstoreSize.get()) + - ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid)); - + LOG.info("Started memstore flush for " + this + ", current region memstore size " + + StringUtils.humanReadableInt(this.memstoreSize.get()) + ", and " + storesToFlush.size() + + "/" + stores.size() + " column families' memstores are being flushed." + + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid)); + for (Store store: storesToFlush) { + LOG.info("Flushing Column Family: " + store.getColumnFamilyName() + " which was occupying " + + StringUtils.humanReadableInt(store.getMemStoreSize()) + " of memstore."); + } // Stop updates while we snapshot the memstore of all stores. We only have // to do this for a moment. Its quick. The subsequent sequence id that // goes into the HLog after we've flushed all these snapshots also goes @@ -1707,9 +1811,28 @@ public class HRegion implements HeapSize { // , Writable{ status.setStatus("Obtaining lock to block concurrent updates"); // block waiting for the lock for internal flush this.updatesLock.writeLock().lock(); - long totalFlushableSize = 0; status.setStatus("Preparing to flush by snapshotting stores"); - List storeFlushCtxs = new ArrayList(stores.size()); + long totalFlushableSizeOfFlushableStores = 0; + + Set storesNotToFlush = new HashSet(stores.values()); + storesNotToFlush.removeAll(storesToFlush); + + // Calculate the smallest LSN numbers for edits in the stores that will + // be flushed and the ones which won't be. This will be used to populate + // the firstSeqWrittenInCurrentMemstore and + // firstSeqWrittenInSnapshotMemstore maps correctly. + long smallestSeqIdInStoresToFlush = Long.MAX_VALUE; + for (Store store: storesToFlush) { + smallestSeqIdInStoresToFlush = Math.min(smallestSeqIdInStoresToFlush, + store.getSmallestSeqNumberInMemstore()); + } + + long smallestSeqIdInStoresNotToFlush = Long.MAX_VALUE; + for (Store store: storesNotToFlush) { + smallestSeqIdInStoresNotToFlush = Math.min(smallestSeqIdInStoresNotToFlush, + store.getSmallestSeqNumberInMemstore()); + } + List storeFlushCtxs = new ArrayList(storesToFlush.size()); long flushSeqId = -1L; try { // Record the mvcc for all transactions in progress. @@ -1717,20 +1840,22 @@ public class HRegion implements HeapSize { // , Writable{ mvcc.advanceMemstore(w); // check if it is not closing. if (wal != null) { - if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { + Long startSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes(), + smallestSeqIdInStoresToFlush, smallestSeqIdInStoresNotToFlush, sequenceId); + if (startSeqId == null) { String msg = "Flush will not be started for [" + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; status.setStatus(msg); return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } - flushSeqId = this.sequenceId.incrementAndGet(); + flushSeqId = startSeqId.longValue(); } else { // use the provided sequence Id as WAL is not being used for this flush. flushSeqId = myseqid; } - for (Store s : stores.values()) { - totalFlushableSize += s.getFlushableSize(); + for (Store s : storesToFlush) { + totalFlushableSizeOfFlushableStores += s.getFlushableSize(); storeFlushCtxs.add(s.createFlushContext(flushSeqId)); } @@ -1742,7 +1867,7 @@ public class HRegion implements HeapSize { // , Writable{ this.updatesLock.writeLock().unlock(); } String s = "Finished memstore snapshotting " + this + - ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; + ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores; status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); @@ -1789,7 +1914,7 @@ public class HRegion implements HeapSize { // , Writable{ storeFlushCtxs.clear(); // Set down the memstore size by amount of flush. - this.addAndGetGlobalMemstoreSize(-totalFlushableSize); + this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores); } catch (Throwable t) { // An exception here means that the snapshot was not persisted. // The hlog needs to be replayed so its content is restored to memstore. @@ -1813,7 +1938,9 @@ public class HRegion implements HeapSize { // , Writable{ } // Record latest flush time - this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); + for (Store store: storesToFlush) { + this.lastStoreFlushTimeMap.put(store, startTime); + } // Update the last flushed sequence id for region completeSequenceId = flushSeqId; @@ -1827,7 +1954,7 @@ public class HRegion implements HeapSize { // , Writable{ long time = EnvironmentEdgeManager.currentTimeMillis() - startTime; long memstoresize = this.memstoreSize.get(); String msg = "Finished memstore flush of ~" + - StringUtils.humanReadableInt(totalFlushableSize) + "/" + totalFlushableSize + + StringUtils.humanReadableInt(totalFlushableSizeOfFlushableStores) + "/" + totalFlushableSizeOfFlushableStores + ", currentsize=" + StringUtils.humanReadableInt(memstoresize) + "/" + memstoresize + " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId + @@ -1835,7 +1962,7 @@ public class HRegion implements HeapSize { // , Writable{ ((wal == null)? "; wal=null": ""); LOG.info(msg); status.setStatus(msg); - this.recentFlushes.add(new Pair(time/1000, totalFlushableSize)); + this.recentFlushes.add(new Pair(time/1000, totalFlushableSizeOfFlushableStores)); return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED : FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId); @@ -2327,7 +2454,7 @@ public class HRegion implements HeapSize { // , Writable{ long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE; WALEdit walEdit = new WALEdit(isInReplay); MultiVersionConsistencyControl.WriteEntry w = null; - long txid = 0; + HLog.TxidAndSeqNum txidAndSeqNum = HLog.DUMMY_TXID_AND_SEQ_NUM;; boolean doRollBackMemstore = false; boolean locked = false; @@ -2477,26 +2604,7 @@ public class HRegion implements HeapSize { // , Writable{ } // ------------------------------------ - // STEP 3. Write back to memstore - // Write to memstore. It is ok to write to memstore - // first without updating the HLog because we do not roll - // forward the memstore MVCC. The MVCC will be moved up when - // the complete operation is done. These changes are not yet - // visible to scanners till we update the MVCC. The MVCC is - // moved only when the sync is complete. - // ---------------------------------- - long addedSize = 0; - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - continue; - } - doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote - addedSize += applyFamilyMapToMemstore(familyMaps[i], w); - } - - // ------------------------------------ - // STEP 4. Build WAL edit + // STEP 3. Build WAL edit // ---------------------------------- boolean hasWalAppends = false; Durability durability = Durability.USE_DEFAULT; @@ -2506,7 +2614,6 @@ public class HRegion implements HeapSize { // , Writable{ != OperationStatusCode.NOT_RUN) { continue; } - batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; Mutation m = batchOp.getMutation(i); Durability tmpDur = getEffectiveDurability(m.getDurability()); @@ -2529,7 +2636,7 @@ public class HRegion implements HeapSize { // , Writable{ throw new IOException("Multiple nonces per batch and not in replay"); } // txid should always increase, so having the one from the last call is ok. - txid = this.log.appendNoSync(this.getRegionInfo(), htableDescriptor.getTableName(), + txidAndSeqNum = this.log.appendNoSync(this.getRegionInfo(), htableDescriptor.getTableName(), walEdit, m.getClusterIds(), now, htableDescriptor, this.sequenceId, true, currentNonceGroup, currentNonce); hasWalAppends = true; @@ -2550,15 +2657,37 @@ public class HRegion implements HeapSize { // , Writable{ } // ------------------------- - // STEP 5. Append the final edit to WAL. Do not sync wal. + // STEP 4. Append the final edit to WAL. Do not sync wal. // ------------------------- Mutation mutation = batchOp.getMutation(firstIndex); if (walEdit.size() > 0) { - txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), + txidAndSeqNum = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdit, mutation.getClusterIds(), now, this.htableDescriptor, this.sequenceId, true, currentNonceGroup, currentNonce); hasWalAppends = true; } + + // ------------------------------------ + // STEP 5. Write back to memstore + // Write to memstore. It is ok to write to memstore + // first without syncing the HLog because we do not roll + // forward the memstore MVCC. The MVCC will be moved up when + // the complete operation is done. These changes are not yet + // visible to scanners till we update the MVCC. The MVCC is + // moved only when the sync is complete. + // Per-CF flush need to record txid per store, so we write to memstore + // after logging. + // ---------------------------------- + long addedSize = 0; + for (int i = firstIndex; i < lastIndexExclusive; i++) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() + != OperationStatusCode.NOT_RUN) { + continue; + } + batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; + doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote + addedSize += applyFamilyMapToMemstore(familyMaps[i], w, txidAndSeqNum.seqNum); + } // ------------------------------- // STEP 6. Release row locks, etc. @@ -2573,7 +2702,7 @@ public class HRegion implements HeapSize { // , Writable{ // STEP 7. Sync wal. // ------------------------- if (hasWalAppends) { - syncOrDefer(txid, durability); + syncOrDefer(txidAndSeqNum.txid, durability); } doRollBackMemstore = false; // calling the post CP hook for batch mutation @@ -2980,11 +3109,12 @@ public class HRegion implements HeapSize { // , Writable{ * @param familyMap Map of kvs per family * @param localizedWriteEntry The WriteEntry of the MVCC for this transaction. * If null, then this method internally creates a mvcc transaction. + * @param seqNum The log sequence number associated with the edits. * @return the additional memory usage of the memstore caused by the * new entries. */ private long applyFamilyMapToMemstore(Map> familyMap, - MultiVersionConsistencyControl.WriteEntry localizedWriteEntry) { + MultiVersionConsistencyControl.WriteEntry localizedWriteEntry, long seqNum) { long size = 0; boolean freemvcc = false; @@ -3004,7 +3134,7 @@ public class HRegion implements HeapSize { // , Writable{ Cell cell = cells.get(i); KeyValue kv = KeyValueUtil.ensureKeyValue(cell); kv.setMvccVersion(localizedWriteEntry.getWriteNumber()); - size += store.add(kv); + size += store.add(kv, seqNum); } } } finally { @@ -3137,7 +3267,9 @@ public class HRegion implements HeapSize { // , Writable{ writestate.flushRequested = true; } // Make request outside of synchronize block; HBASE-818. - this.rsServices.getFlushRequester().requestFlush(this); + // Request for a selective flush of the memstore, if possible. + // This function is called by put(), delete(), etc. + this.rsServices.getFlushRequester().requestFlush(this, this.perColumnFamilyFlushEnabled); if (LOG.isDebugEnabled()) { LOG.debug("Flush requested on " + this); } @@ -3152,6 +3284,15 @@ public class HRegion implements HeapSize { // , Writable{ } /** + * @param store + * @return true if the size of the store is above the flush threshold for column families + */ + private boolean shouldFlushStore(Store store) { + return (store.getMemStoreSize() > this.columnfamilyMemstoreFlushSize) ? + true : false; + } + + /** * Read the edits log put under this region by wal log splitting process. Put * the recovered edits back up into this region. * @@ -3395,9 +3536,11 @@ public class HRegion implements HeapSize { // , Writable{ // Once we are over the limit, restoreEdit will keep returning true to // flush -- but don't flush until we've played all the kvs that make up // the WALEdit. - flush = restoreEdit(store, kv); + flush = restoreEdit(store, kv, key.getLogSeqNum()); editsCount++; } + // We do not want to write to the WAL again, and hence setting the WAL + // parameter to null. if (flush) internalFlushcache(null, currentEditSeqId, status); if (coprocessorHost != null) { @@ -3465,10 +3608,11 @@ public class HRegion implements HeapSize { // , Writable{ * Used by tests * @param s Store to add edit too. * @param kv KeyValue to add. + * @param seqNum The sequence number for the edit. * @return True if we should flush. */ - protected boolean restoreEdit(final Store s, final KeyValue kv) { - long kvSize = s.add(kv); + protected boolean restoreEdit(final Store s, final KeyValue kv, long seqNum) { + long kvSize = s.add(kv, seqNum); if (this.rsAccounting != null) { rsAccounting.addAndGetRegionReplayEditsSize(this.getRegionName(), kvSize); } @@ -5018,24 +5162,26 @@ public class HRegion implements HeapSize { // , Writable{ writeEntry = mvcc.beginMemstoreInsert(); // 6. Call the preBatchMutate hook processor.preBatchMutate(this, walEdit); - // 7. Apply to memstore + + HLog.TxidAndSeqNum txidAndSeqNum = HLog.DUMMY_TXID_AND_SEQ_NUM; + // 7. Append no sync + if (!walEdit.isEmpty()) { + txidAndSeqNum = this.log.appendNoSync(this.getRegionInfo(), + this.htableDescriptor.getTableName(), walEdit, processor.getClusterIds(), now, + this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce); + } + + // 8. Apply to memstore for (Mutation m : mutations) { for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current()); kv.setMvccVersion(writeEntry.getWriteNumber()); byte[] family = kv.getFamily(); checkFamily(family); - addedSize += stores.get(family).add(kv); + addedSize += stores.get(family).add(kv, txidAndSeqNum.seqNum); } } - long txid = 0; - // 8. Append no sync - if (!walEdit.isEmpty()) { - txid = this.log.appendNoSync(this.getRegionInfo(), - this.htableDescriptor.getTableName(), walEdit, processor.getClusterIds(), now, - this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce); - } // 9. Release region lock if (locked) { this.updatesLock.readLock().unlock(); @@ -5046,8 +5192,8 @@ public class HRegion implements HeapSize { // , Writable{ releaseRowLocks(acquiredRowLocks); // 11. Sync edit log - if (txid != 0) { - syncOrDefer(txid, getEffectiveDurability(processor.useDurability())); + if (txidAndSeqNum.txid != 0) { + syncOrDefer(txidAndSeqNum.txid, getEffectiveDurability(processor.useDurability())); } walSyncSuccessful = true; // 12. call postBatchMutate hook @@ -5167,7 +5313,7 @@ public class HRegion implements HeapSize { // , Writable{ Map> tempMemstore = new HashMap>(); long size = 0; - long txid = 0; + HLog.TxidAndSeqNum txidAndSeqNum = HLog.DUMMY_TXID_AND_SEQ_NUM; checkReadOnly(); checkResources(); @@ -5287,7 +5433,7 @@ public class HRegion implements HeapSize { // , Writable{ // Using default cluster id, as this can only happen in the orginating // cluster. A slave cluster receives the final value (not the delta) // as a Put. - txid = this.log.appendNoSync(this.getRegionInfo(), + txidAndSeqNum = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdits, new ArrayList(), EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce); @@ -5300,12 +5446,12 @@ public class HRegion implements HeapSize { // , Writable{ Store store = entry.getKey(); if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); + size += store.upsert(entry.getValue(), getSmallestReadPoint(), txidAndSeqNum.seqNum); } else { // otherwise keep older versions around for (Cell cell: entry.getValue()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - size += store.add(kv); + size += store.add(kv, txidAndSeqNum.seqNum); } } allKVs.addAll(entry.getValue()); @@ -5320,7 +5466,7 @@ public class HRegion implements HeapSize { // , Writable{ } if (writeToWAL) { // sync the transaction log outside the rowlock - syncOrDefer(txid, durability); + syncOrDefer(txidAndSeqNum.txid, durability); } } finally { if (w != null) { @@ -5365,7 +5511,7 @@ public class HRegion implements HeapSize { // , Writable{ Map> tempMemstore = new HashMap>(); long size = 0; - long txid = 0; + HLog.TxidAndSeqNum txidAndSeqNum = HLog.DUMMY_TXID_AND_SEQ_NUM;; checkReadOnly(); checkResources(); @@ -5486,7 +5632,7 @@ public class HRegion implements HeapSize { // , Writable{ // Using default cluster id, as this can only happen in the orginating // cluster. A slave cluster receives the final value (not the delta) // as a Put. - txid = this.log.appendNoSync(this.getRegionInfo(), + txidAndSeqNum = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getTableName(), walEdits, new ArrayList(), EnvironmentEdgeManager.currentTimeMillis(), this.htableDescriptor, this.sequenceId, true, nonceGroup, nonce); @@ -5500,12 +5646,12 @@ public class HRegion implements HeapSize { // , Writable{ Store store = entry.getKey(); if (store.getFamily().getMaxVersions() == 1) { // upsert if VERSIONS for this CF == 1 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); + size += store.upsert(entry.getValue(), getSmallestReadPoint(), txidAndSeqNum.seqNum); } else { // otherwise keep older versions around for (Cell cell : entry.getValue()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - size += store.add(kv); + size += store.add(kv, txidAndSeqNum.seqNum); } } } @@ -5520,7 +5666,7 @@ public class HRegion implements HeapSize { // , Writable{ } if (writeToWAL && (walEdits != null) && !walEdits.isEmpty()) { // sync the transaction log outside the rowlock - syncOrDefer(txid, durability); + syncOrDefer(txidAndSeqNum.txid, durability); } } finally { if (w != null) { @@ -5556,7 +5702,7 @@ public class HRegion implements HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 42 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (12 * Bytes.SIZEOF_LONG) + 4 * Bytes.SIZEOF_BOOLEAN); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index ff86b00..0c42ad8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3984,7 +3984,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa LOG.info("Flushing " + region.getRegionNameAsString()); boolean shouldFlush = true; if (request.hasIfOlderThanTs()) { - shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs(); + shouldFlush = region.getMinFlushTimeForAllStores() < request.getIfOlderThanTs(); } FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); if (shouldFlush) { @@ -3995,7 +3995,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } builder.setFlushed(result); } - builder.setLastFlushTime(region.getLastFlushTime()); + builder.setLastFlushTime(region.getMinFlushTimeForAllStores()); return builder.build(); } catch (DroppedSnapshotException ex) { // Cache flush can fail in a few places. If it fails in a critical diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c6b62d4..5f84a40 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -567,10 +567,10 @@ public class HStore implements Store { } @Override - public long add(final KeyValue kv) { + public long add(final KeyValue kv, long seqNum) { lock.readLock().lock(); try { - return this.memstore.add(kv); + return this.memstore.add(kv, seqNum); } finally { lock.readLock().unlock(); } @@ -587,10 +587,10 @@ public class HStore implements Store { * @param kv * @return memstore size delta */ - protected long delete(final KeyValue kv) { + protected long delete(final KeyValue kv, long seqNum) { lock.readLock().lock(); try { - return this.memstore.delete(kv); + return this.memstore.delete(kv, seqNum); } finally { lock.readLock().unlock(); } @@ -1888,9 +1888,19 @@ public class HStore implements Store { @Override public long getMemStoreSize() { - return this.memstore.heapSize(); + // Use memstore.keySize() instead of heapSize() since heapSize() gives the + // size of the keys + size of the map. + return this.memstore.keySize(); } + /** + * A helper function to get the smallest LSN in the mestore. + * @return + */ + public long getSmallestSeqNumberInMemstore() { + return this.memstore.getSmallestSeqNumber(); + } + @Override public int getCompactPriority() { int priority = this.storeEngine.getStoreFileManager().getStoreCompactionPriority(); @@ -1939,11 +1949,12 @@ public class HStore implements Store { * @param f family to update * @param qualifier qualifier to update * @param newValue the new value to set into memstore + * @param seqNum The LSN associated with the edit. * @return memstore size delta * @throws IOException */ public long updateColumnValue(byte [] row, byte [] f, - byte [] qualifier, long newValue) + byte [] qualifier, long newValue, long seqNum) throws IOException { this.lock.readLock().lock(); @@ -1954,7 +1965,8 @@ public class HStore implements Store { f, qualifier, newValue, - now); + now, + seqNum); } finally { this.lock.readLock().unlock(); @@ -1962,10 +1974,10 @@ public class HStore implements Store { } @Override - public long upsert(Iterable cells, long readpoint) throws IOException { + public long upsert(Iterable cells, long readpoint, long seqNum) throws IOException { this.lock.readLock().lock(); try { - return this.memstore.upsert(cells, readpoint); + return this.memstore.upsert(cells, readpoint, seqNum); } finally { this.lock.readLock().unlock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 0e98143..3978eff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -130,7 +130,10 @@ class LogRoller extends HasThread implements WALActionsListener { if (r != null) { requester = this.services.getFlushRequester(); if (requester != null) { - requester.requestFlush(r); + // If we do a selective flush, some column families might remain in + // the memstore for a long time, and might cause old logs to + // accumulate. Hence, we would not request for a selective flush. + requester.requestFlush(r, false); scheduled = true; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index e365cc9..cf34f24 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -82,6 +82,9 @@ public class MemStore implements HeapSize { // Snapshot of memstore. Made for flusher. volatile KeyValueSkipListSet snapshot; + // Smallest LSN amongst all the edits in the Memstore + volatile AtomicLong smallestSeqNumber = new AtomicLong(); + final KeyValue.KVComparator comparator; // Used to track own heapSize @@ -126,6 +129,7 @@ public class MemStore implements HeapSize { this.allocator = null; this.chunkPool = null; } + this.smallestSeqNumber.set(Long.MAX_VALUE); } void dump() { @@ -138,6 +142,31 @@ public class MemStore implements HeapSize { } /** + * Get the smallest LSN + * @return + */ + long getSmallestSeqNumber() { + return smallestSeqNumber.get(); + } + + /** + * Update the smallest LSN + * @param seqNum + */ + void updateSmallestSeqNumber(long seqNum) { + if (seqNum < 0) { + return; + } + + // Do a Compare-and-Set instead of synchronized here. + long smallestSeqNumberVal; + do { + smallestSeqNumberVal = smallestSeqNumber.get(); + } while (!smallestSeqNumber.compareAndSet(smallestSeqNumberVal, + Math.min(smallestSeqNumberVal, seqNum))); + } + + /** * Creates a snapshot of the current memstore. * Snapshot must be cleared by call to {@link #clearSnapshot(SortedSet)} * To get the snapshot made by this method, use {@link #getSnapshot()} @@ -163,6 +192,8 @@ public class MemStore implements HeapSize { this.allocator = new MemStoreLAB(conf, chunkPool); } timeOfOldestEdit = Long.MAX_VALUE; + // Reset the smallest sequence number + this.smallestSeqNumber.set(Long.MAX_VALUE); } } } @@ -221,13 +252,25 @@ public class MemStore implements HeapSize { } /** + * Write an update.

+ * This method should only be used by tests, since it does not specify the + * LSN for the edit. + * @param kv + * @return + */ + long add(final KeyValue kv) { + return add(kv, -1L); + } + + /** * Write an update * @param kv + * @param seqNum * @return approximate size of the passed key and value. */ - long add(final KeyValue kv) { + long add(final KeyValue kv, long seqNum) { KeyValue toAdd = maybeCloneWithAllocator(kv); - return internalAdd(toAdd); + return internalAdd(toAdd, seqNum); } long timeOfOldestEdit() { @@ -258,10 +301,11 @@ public class MemStore implements HeapSize { * * Callers should ensure they already have the read lock taken */ - private long internalAdd(final KeyValue toAdd) { + private long internalAdd(final KeyValue toAdd, long seqNum) { long s = heapSizeChange(toAdd, addToKVSet(toAdd)); timeRangeTracker.includeTimestamp(toAdd); this.size.addAndGet(s); + updateSmallestSeqNumber(seqNum); return s; } @@ -314,16 +358,27 @@ public class MemStore implements HeapSize { } /** + * Should only be used in tests, since it does not provide a seqNum. + * @param delete + * @return + */ + long delete(final KeyValue delete) { + return delete(delete, -1); + } + + /** * Write a delete * @param delete + * @param seqNum * @return approximate size of the passed key and value. */ - long delete(final KeyValue delete) { + long delete(final KeyValue delete, long seqNum) { long s = 0; KeyValue toAdd = maybeCloneWithAllocator(delete); s += heapSizeChange(toAdd, addToKVSet(toAdd)); timeRangeTracker.includeTimestamp(toAdd); this.size.addAndGet(s); + updateSmallestSeqNumber(seqNum); return s; } @@ -466,13 +521,15 @@ public class MemStore implements HeapSize { * @param qualifier * @param newValue * @param now + * @param seqNum The LSN for the edit * @return Timestamp */ long updateColumnValue(byte[] row, byte[] family, byte[] qualifier, long newValue, - long now) { + long now, + long seqNum) { KeyValue firstKv = KeyValue.createFirstOnRow( row, family, qualifier); // Is there a KeyValue in 'snapshot' with the same TS? If so, upgrade the timestamp a bit. @@ -512,7 +569,11 @@ public class MemStore implements HeapSize { // 'now' and a 0 memstoreTS == immediately visible List cells = new ArrayList(1); cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))); - return upsert(cells, 1L); + return upsert(cells, 1L, seqNum); + } + + long upsert(Iterable cells, long readpoint) { + return upsert(cells, readpoint, -1L); } /** @@ -533,10 +594,10 @@ public class MemStore implements HeapSize { * @param readpoint readpoint below which we can safely remove duplicate KVs * @return change in memstore size */ - public long upsert(Iterable cells, long readpoint) { + public long upsert(Iterable cells, long readpoint, long seqNum) { long size = 0; for (Cell cell : cells) { - size += upsert(cell, readpoint); + size += upsert(cell, readpoint, seqNum); } return size; } @@ -555,7 +616,7 @@ public class MemStore implements HeapSize { * @param cell * @return change in size of MemStore */ - private long upsert(Cell cell, long readpoint) { + private long upsert(Cell cell, long readpoint, long seqNum) { // Add the KeyValue to the MemStore // Use the internalAdd method here since we (a) already have a lock // and (b) cannot safely use the MSLAB here without potentially @@ -563,7 +624,7 @@ public class MemStore implements HeapSize { // test that triggers the pathological case if we don't avoid MSLAB // here. KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - long addedSize = internalAdd(kv); + long addedSize = internalAdd(kv, seqNum); // Get the KeyValues for the row/family/qualifier regardless of timestamp. // For this case we want to clean up any other puts @@ -1001,7 +1062,7 @@ public class MemStore implements HeapSize { } public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (10 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)); + ClassSize.OBJECT + (11 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_LONG)); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 4c7bb0b..7153d26 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -208,7 +208,7 @@ class MemStoreFlusher implements FlushRequester { Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); LOG.info("Flush of region " + regionToFlush + " due to global heap pressure"); - flushedOne = flushRegion(regionToFlush, true); + flushedOne = flushRegion(regionToFlush, true, false); if (!flushedOne) { LOG.info("Excluding unflushable region " + regionToFlush + " - trying to find a different region to flush."); @@ -322,11 +322,15 @@ class MemStoreFlusher implements FlushRequester { } public void requestFlush(HRegion r) { + requestFlush(r, false); + } + + public void requestFlush(HRegion r, boolean selectiveFlushRequest) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has no delay so it will be added at the top of the flush // queue. It'll come out near immediately. - FlushRegionEntry fqe = new FlushRegionEntry(r); + FlushRegionEntry fqe = new FlushRegionEntry(r, selectiveFlushRequest); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); } @@ -337,7 +341,7 @@ class MemStoreFlusher implements FlushRequester { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has some delay - FlushRegionEntry fqe = new FlushRegionEntry(r); + FlushRegionEntry fqe = new FlushRegionEntry(r, false); fqe.requeue(delay); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); @@ -432,7 +436,7 @@ class MemStoreFlusher implements FlushRequester { return true; } } - return flushRegion(region, false); + return flushRegion(region, false, fqe.isSelectiveFlushRequest()); } /* @@ -442,12 +446,14 @@ class MemStoreFlusher implements FlushRequester { * needs to be removed from the flush queue. If false, when we were called * from the main flusher run loop and we got the entry to flush by calling * poll on the flush queue (which removed it). - * + * @param selectiveFlushRequest Do we want to selectively flush only the + * column families that dominate the memstore size? * @return true if the region was successfully flushed, false otherwise. If * false, there will be accompanying log messages explaining why the log was * not flushed. */ - private boolean flushRegion(final HRegion region, final boolean emergencyFlush) { + private boolean flushRegion(final HRegion region, final boolean emergencyFlush, + boolean selectiveFlushRequest) { synchronized (this.regionsInQueue) { FlushRegionEntry fqe = this.regionsInQueue.remove(region); if (fqe != null && emergencyFlush) { @@ -458,7 +464,7 @@ class MemStoreFlusher implements FlushRequester { } lock.readLock().lock(); try { - boolean shouldCompact = region.flushcache().isCompactionNeeded(); + boolean shouldCompact = region.flushcache(selectiveFlushRequest).isCompactionNeeded(); // We just want to check the size boolean shouldSplit = region.checkSplit() != null; if (shouldSplit) { @@ -612,13 +618,30 @@ class MemStoreFlusher implements FlushRequester { private final long createTime; private long whenToExpire; private int requeueCount = 0; + private boolean selectiveFlushRequest; - FlushRegionEntry(final HRegion r) { + /** + * @param r The region to flush + * @param selectiveFlushRequest Do we want to flush only the column + * families that dominate the memstore size, + * i.e., do a selective flush? If we are + * doing log rolling, then we should not do a + * selective flush. + */ + FlushRegionEntry(final HRegion r, boolean selectiveFlushRequest) { this.region = r; this.createTime = System.currentTimeMillis(); this.whenToExpire = this.createTime; + this.selectiveFlushRequest = selectiveFlushRequest; } - + + /** + * @return Is this a request for a selective flush? + */ + public boolean isSelectiveFlushRequest() { + return selectiveFlushRequest; + } + /** * @param maximumWait * @return True if we have been delayed > maximumWait milliseconds. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 770ab75..f2ac5af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -114,17 +114,19 @@ public interface Store extends HeapSize, StoreConfigInformation { * across all of them. * @param cells * @param readpoint readpoint below which we can safely remove duplicate KVs + * @param seqNum The LSN associated with the key. * @return memstore size delta * @throws IOException */ - long upsert(Iterable cells, long readpoint) throws IOException; + long upsert(Iterable cells, long readpoint, long seqNum) throws IOException; /** * Adds a value to the memstore * @param kv + * @param seqNum The LSN associated with the key. * @return memstore size delta */ - long add(KeyValue kv); + long add(KeyValue kv, long seqNum); /** * When was the last edit done in the memstore @@ -152,6 +154,12 @@ public interface Store extends HeapSize, StoreConfigInformation { */ KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException; + /** + * A helper function to get the smallest LSN in the mestore. + * @return + */ + public long getSmallestSeqNumberInMemstore(); + FileSystem getFileSystem(); /* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index f96e645..78bd3c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -36,7 +36,6 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; @@ -993,20 +992,21 @@ class FSHLog implements HLog, Syncable { * @throws IOException */ @SuppressWarnings("deprecation") - private long append(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, + private TxidAndSeqNum append(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, AtomicLong sequenceId, long nonceGroup, long nonce) throws IOException { - if (edits.isEmpty()) return this.unflushedEntries.get(); + if (edits.isEmpty()) return new TxidAndSeqNum(this.unflushedEntries.get(), sequenceId.get()); if (this.closed) { throw new IOException("Cannot append; log is closed"); } TraceScope traceScope = Trace.startSpan("FSHlog.append"); try { long txid = 0; + long seqNum; synchronized (this.updateLock) { // get the sequence number from the passed Long. In normal flow, it is coming from the // region. - long seqNum = sequenceId.incrementAndGet(); + seqNum = sequenceId.incrementAndGet(); // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular // memstore). . When the cache is flushed, the entry for the @@ -1041,14 +1041,14 @@ class FSHLog implements HLog, Syncable { // sync txn to file system this.sync(txid); } - return txid; + return new TxidAndSeqNum(txid, seqNum); } finally { traceScope.close(); } } @Override - public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, + public TxidAndSeqNum appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId, boolean isInMemstore, long nonceGroup, long nonce) throws IOException { return append(info, tableName, edits, clusterIds, @@ -1536,19 +1536,42 @@ class FSHLog implements HLog, Syncable { } @Override - public boolean startCacheFlush(final byte[] encodedRegionName) { + public Long startCacheFlush(final byte[] encodedRegionName, + long oldestSeqIdInStoresToFlush, + long oldestSeqIdInStoresNotToFlush, AtomicLong sequenceId) { + long flushSeqId; Long oldRegionSeqNum = null; if (!closeBarrier.beginOp()) { LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) + " - because the server is closing."); - return false; + return null; } synchronized (oldestSeqNumsLock) { - oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName); - if (oldRegionSeqNum != null) { - Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum); - assert oldValue == null : "Flushing map not cleaned up for " - + Bytes.toString(encodedRegionName); + if (oldestSeqIdInStoresNotToFlush == Long.MAX_VALUE) { + oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName); + if (oldRegionSeqNum != null) { + Long oldValue = this.oldestFlushingSeqNums.put(encodedRegionName, oldRegionSeqNum); + assert oldValue == null: "Flushing map not cleaned up for " + + Bytes.toString(encodedRegionName); + } + flushSeqId = sequenceId.incrementAndGet(); + } else { + // Amongst the Stores not being flushed, what is the smallest sequence + // number? Put that in this map. + oldRegionSeqNum = this.oldestUnflushedSeqNums.replace(encodedRegionName, + oldestSeqIdInStoresNotToFlush); + + // Amongst the Stores being flushed, what is the smallest sequence + // number? Put that in this map. + this.oldestFlushingSeqNums.put(encodedRegionName, oldestSeqIdInStoresToFlush); + + // During Log Replay, we can safely discard any edits that have + // the sequence number less than the smallest sequence id amongst the + // stores that we are not flushing. This might re-apply some edits + // which belonged to stores which are going to be flushed, but we + // expect these operations to be idempotent anyways, and this is + // simpler. + flushSeqId = oldestSeqIdInStoresNotToFlush - 1; } } if (oldRegionSeqNum == null) { @@ -1560,12 +1583,11 @@ class FSHLog implements HLog, Syncable { LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: [" + Bytes.toString(encodedRegionName) + "]"); } - return true; + return flushSeqId; } @Override - public void completeCacheFlush(final byte [] encodedRegionName) - { + public void completeCacheFlush(final byte [] encodedRegionName) { synchronized (oldestSeqNumsLock) { this.oldestFlushingSeqNums.remove(encodedRegionName); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 3bc8173..9a78b6e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; @@ -287,6 +288,18 @@ public interface HLog { */ void postSync(); + final class TxidAndSeqNum { + public final long txid; + + public final long seqNum; + + public TxidAndSeqNum(long txid, long seqNum) { + this.txid = txid; + this.seqNum = seqNum; + } + } + + static final TxidAndSeqNum DUMMY_TXID_AND_SEQ_NUM = new TxidAndSeqNum(0L, HConstants.NO_SEQNUM); /** * Append a set of edits to the log. Log edits are keyed by (encoded) regionName, rowname, and * log-sequence-id. The HLog is not flushed after this transaction is written to the log. @@ -300,7 +313,7 @@ public interface HLog { * @return txid of this transaction * @throws IOException */ - long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, + TxidAndSeqNum appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId, boolean isInMemstore, long nonceGroup, long nonce) throws IOException; @@ -319,15 +332,27 @@ public interface HLog { * to flush memstore. * * We stash the oldest seqNum for the region, and let the the next edit inserted in this - * region be recorded in {@link #append(HRegionInfo, TableName, WALEdit, long, HTableDescriptor, - * AtomicLong)} as new oldest seqnum. - * In case of flush being aborted, we put the stashed value back; in case of flush succeeding, - * the seqNum of that first edit after start becomes the valid oldest seqNum for this region. + * region be recorded in {@link #append(HRegionInfo, TableName, WALEdit, long, HTableDescriptor)} + * as new oldest seqnum. In case of flush being aborted, we put the stashed value back; + * in case of flush succeeding, the seqNum of that first edit after start becomes the + * valid oldest seqNum for this region. + * + * In case the per-CF flush is enabled, we cannot simply clear the + * firstSeqWritten entry for the region to be flushed. There might be certain + * CFs whose memstores won't be flushed. Therefore, we need the first LSNs for + * the stores that will be flushed, and first LSNs for the stores that won't + * be flushed. * - * @return true if the flush can proceed, false in case wal is closing (ususally, when server is - * closing) and flush couldn't be started. + * @param regionName + * @param oldestSeqIdInStoresToFlush + * @param oldestSeqIdInStoresNotToFlush + * @param sequenceId of the region. + * @return current seqNum, to pass on to flushers (who will put it into the metadata of + * the resulting file as an upper-bound seqNum for that file), or NULL if flush + * should not be started. */ - boolean startCacheFlush(final byte[] encodedRegionName); + Long startCacheFlush(final byte[] encodedRegionName, long oldestSeqIdInStoresToFlush, + long oldestSeqIdInStoresNotToFlush, AtomicLong sequenceId); /** * Complete the cache flush. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index 53ae1f7..0362bca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -269,7 +269,7 @@ public class HLogUtil { long now = EnvironmentEdgeManager.currentTimeMillis(); TableName tn = TableName.valueOf(c.getTableName().toByteArray()); long txid = log.appendNoSync(info, tn, e, new ArrayList(), now, htd, sequenceId, - false, HConstants.NO_NONCE, HConstants.NO_NONCE); + false, HConstants.NO_NONCE, HConstants.NO_NONCE).txid; log.sync(txid); if (LOG.isTraceEnabled()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index 26d08c0..51b7981 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -260,7 +260,7 @@ public class TestIOFencing { compactingRegion = (CompactionBlockerRegion)testRegions.get(0); LOG.info("Blocking compactions"); compactingRegion.stopCompactions(); - long lastFlushTime = compactingRegion.getLastFlushTime(); + long lastFlushTime = compactingRegion.getMinFlushTimeForAllStores(); // Load some rows TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); @@ -276,7 +276,7 @@ public class TestIOFencing { // Wait till flush has happened, otherwise there won't be multiple store files long startWaitTime = System.currentTimeMillis(); - while (compactingRegion.getLastFlushTime() <= lastFlushTime || + while (compactingRegion.getMinFlushTimeForAllStores() <= lastFlushTime || compactingRegion.countStoreFiles() <= 1) { LOG.info("Waiting for the region to flush " + compactingRegion.getRegionNameAsString()); Thread.sleep(1000); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java new file mode 100644 index 0000000..96ad58d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestPerColumnFamilyFlush.java @@ -0,0 +1,528 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import static org.junit.Assert.*; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +/** + * This test verifies the correctness of the Per Column Family flushing strategy + */ +@Category(MediumTests.class) +public class TestPerColumnFamilyFlush { + private static final Log LOG = + LogFactory.getLog(TestPerColumnFamilyFlush.class); + HRegion region = null; + private final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final String DIR = TEST_UTIL.getClusterTestDir() + + "/TestHRegion/"; + public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1"); + + public static final byte[][] families = { Bytes.toBytes("f1"), + Bytes.toBytes("f2"), Bytes.toBytes("f3"), Bytes.toBytes("f4"), + Bytes.toBytes("f5") }; + public static final byte[] FAMILY1 = families[0]; + public static final byte[] FAMILY2 = families[1]; + public static final byte[] FAMILY3 = families[2]; + + private void initHRegion(String callingMethod, Configuration conf) throws IOException { + HTableDescriptor htd = new HTableDescriptor(TABLENAME); + for(byte [] family : families) { + htd.addFamily(new HColumnDescriptor(family)); + } + HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false); + Path path = new Path(DIR + callingMethod); + region = HRegion.createHRegion(info, path, conf, htd); + } + + // A helper function to create puts. + Put createPut(int familyNum, int putNum) { + byte[] qf = Bytes.toBytes("q" + familyNum); + byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); + byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); + Put p = new Put(row); + p.add(families[familyNum - 1], qf, val); + return p; + } + + // A helper function to create puts. + Get createGet(int familyNum, int putNum) { + byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); + return new Get(row); + } + + // A helper function to verify edits. + void verifyEdit(int familyNum, int putNum, HTable table) throws IOException { + Result r = table.get(createGet(familyNum, putNum)); + byte[] family = families[familyNum - 1]; + byte[] qf = Bytes.toBytes("q" + familyNum); + byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); + assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), + r.getFamilyMap(family)); + assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), + r.getFamilyMap(family).get(qf)); + assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum), + Arrays.equals(r.getFamilyMap(family).get(qf), val)); + } + + @Test + public void testSelectiveFlushWhenEnabled() throws IOException { + // Set up the configuration + Configuration conf = HBaseConfiguration.create(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200*1024); + conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, true); + conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, 100*1024); + + // Intialize the HRegion + initHRegion("testSelectiveFlushWhenEnabled", conf); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); + + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + region.put(createPut(3, i)); + } + } + } + + long totalMemstoreSize = region.getMemstoreSize().get(); + + // Find the smallest LSNs for edits wrt to each CF. + long smallestSeqCF1 = + region.getStore(FAMILY1).getSmallestSeqNumberInMemstore(); + long smallestSeqCF2 = + region.getStore(FAMILY2).getSmallestSeqNumberInMemstore(); + long smallestSeqCF3 = + region.getStore(FAMILY3).getSmallestSeqNumberInMemstore(); + + // Find the sizes of the memstores of each CF. + long cf1MemstoreSize = + region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSize = + region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSize = + region.getStore(FAMILY3).getMemStoreSize(); + + // Get the overall smallest LSN in the region's memstores. + long smallestSeqInRegionCurrentMemstore = region.getLog() + .getEarliestMemstoreSeqNum( + region.getRegionInfo().getEncodedNameAsBytes()); + + // The overall smallest LSN in the region's memstores should be the same as + // the LSN of the smallest edit in CF1 + assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore); + + // Some other sanity checks. + assertTrue(smallestSeqCF1 < smallestSeqCF2); + assertTrue(smallestSeqCF2 < smallestSeqCF3); + assertTrue(cf1MemstoreSize > 0); + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. + assertTrue(totalMemstoreSize == + (cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize)); + + // Flush! + region.flushcache(true); + + // Will use these to check if anything changed. + long oldCF2MemstoreSize = cf2MemstoreSize; + long oldCF3MemstoreSize = cf3MemstoreSize; + + // Recalculate everything + cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + totalMemstoreSize = region.getMemstoreSize().get(); + smallestSeqInRegionCurrentMemstore = region.getLog() + .getEarliestMemstoreSeqNum( + region.getRegionInfo().getEncodedNameAsBytes()); + + // We should have cleared out only CF1, since we chose the flush thresholds + // and number of puts accordingly. + assertEquals(0, cf1MemstoreSize); + // Nothing should have happened to CF2, ... + assertTrue(cf2MemstoreSize == oldCF2MemstoreSize); + // ... or CF3 + assertTrue(cf3MemstoreSize == oldCF3MemstoreSize); + // Now the smallest LSN in the region should be the same as the smallest + // LSN in the memstore of CF2. + assertTrue(smallestSeqInRegionCurrentMemstore == smallestSeqCF2); + // Of course, this should hold too. + assertTrue(totalMemstoreSize == (cf2MemstoreSize + cf3MemstoreSize)); + + // Now add more puts (mostly for CF2), so that we only flush CF2 this time. + for (int i = 1200; i < 2400; i++) { + region.put(createPut(2, i)); + + // Add only 100 puts for CF3 + if (i - 1200 < 100) { + region.put(createPut(3, i)); + } + } + + // How much does the CF3 memstore occupy? Will be used later. + oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + + // Flush again + region.flushcache(true); + + // Recalculate everything + cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + totalMemstoreSize = region.getMemstoreSize().get(); + smallestSeqInRegionCurrentMemstore = region.getLog() + .getEarliestMemstoreSeqNum( + region.getRegionInfo().getEncodedNameAsBytes()); + + // CF1 and CF2, both should be absent. + assertEquals(0, cf1MemstoreSize); + assertEquals(0, cf2MemstoreSize); + // CF3 shouldn't have been touched. + assertTrue(cf3MemstoreSize == oldCF3MemstoreSize); + assertTrue(totalMemstoreSize == cf3MemstoreSize); + assertTrue(smallestSeqInRegionCurrentMemstore == smallestSeqCF3); + + // What happens when we hit the memstore limit, but we are not able to find + // any Column Family above the threshold? + // In that case, we should flush all the CFs. + + // Clearing the existing memstores. + region.flushcache(false); + + // The memstore limit is 200*1024 and the column family flush threshold is + // around 50*1024. We try to just hit the memstore limit with each CF's + // memstore being below the CF flush threshold. + for (int i = 1; i <= 300; i++) { + region.put(createPut(1, i)); + region.put(createPut(2, i)); + region.put(createPut(3, i)); + region.put(createPut(4, i)); + region.put(createPut(5, i)); + } + + region.flushcache(true); + // Since we won't find any CF above the threshold, and hence no specific + // store to flush, we should flush all the memstores. + assertEquals(0, region.getMemstoreSize().get()); + } + + @Test + public void testSelectiveFlushWhenNotEnabled() throws IOException { + // Set up the configuration + Configuration conf = HBaseConfiguration.create(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); + conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, false); + conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, 100 * 1024); + + // Intialize the HRegion + initHRegion("testSelectiveFlushWhenNotEnabled", conf); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); + + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + region.put(createPut(3, i)); + } + } + } + + long totalMemstoreSize = region.getMemstoreSize().get(); + + // Find the smallest LSNs for edits wrt to each CF. + long smallestSeqCF1 = + region.getStore(FAMILY1).getSmallestSeqNumberInMemstore(); + long smallestSeqCF2 = + region.getStore(FAMILY2).getSmallestSeqNumberInMemstore(); + long smallestSeqCF3 = + region.getStore(FAMILY3).getSmallestSeqNumberInMemstore(); + + // Find the sizes of the memstores of each CF. + long cf1MemstoreSize = + region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSize = + region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSize = + region.getStore(FAMILY3).getMemStoreSize(); + + // Get the overall smallest LSN in the region's memstores. + long smallestSeqInRegionCurrentMemstore = region.getLog() + .getEarliestMemstoreSeqNum( + region.getRegionInfo().getEncodedNameAsBytes()); + + // The overall smallest LSN in the region's memstores should be the same as + // the LSN of the smallest edit in CF1 + assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore); + + // Some other sanity checks. + assertTrue(smallestSeqCF1 < smallestSeqCF2); + assertTrue(smallestSeqCF2 < smallestSeqCF3); + assertTrue(cf1MemstoreSize > 0); + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. + assertTrue(totalMemstoreSize == + (cf1MemstoreSize + cf2MemstoreSize + cf3MemstoreSize)); + + // Flush! + region.flushcache(true); + + cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + totalMemstoreSize = region.getMemstoreSize().get(); + smallestSeqInRegionCurrentMemstore = region.getLog() + .getEarliestMemstoreSeqNum( + region.getRegionInfo().getEncodedNameAsBytes()); + + // Everything should have been cleared + assertEquals(0, cf1MemstoreSize); + assertEquals(0, cf2MemstoreSize); + assertEquals(0, cf3MemstoreSize); + assertEquals(0, totalMemstoreSize); + assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore); + } + + // Find the (first) region which has the specified name. + private Pair getRegionWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = + cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (HRegion region: hrs.getOnlineRegions(tableName)) { + return Pair.newPair(region, hrs); + } + } + return null; + } + + @Test + public void testLogReplay() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, true); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000); + // Carefully chosen limits so that the memstore just flushes when we're done + conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, 10000); + final int numRegionServers = 4; + try { + TEST_UTIL.startMiniCluster(numRegionServers); + } catch (Exception e) { + LOG.error("Could not start the mini cluster. Terminating."); + e.printStackTrace(); + throw e; + } + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + TEST_UTIL.createTable(TABLENAME, families); + HTable table = new HTable(conf, TABLENAME); + HTableDescriptor htd = table.getTableDescriptor(); + + for (byte [] family : families) { + if (!htd.hasFamily(family)) { + htd.addFamily(new HColumnDescriptor(family)); + } + } + + // Add 100 edits for CF1, 20 for CF2, 20 for CF3. + // These will all be interleaved in the log. + for (int i = 1; i <= 80; i++) { + table.put(createPut(1, i)); + if (i <= 10) { + table.put(createPut(2, i)); + table.put(createPut(3, i)); + } + } + table.flushCommits(); + Thread.sleep(1000); + + Pair desiredRegionAndServer = getRegionWithName(TABLENAME); + HRegion desiredRegion =desiredRegionAndServer.getFirst(); + assertTrue("Could not find a region which hosts the new region.", + desiredRegion != null); + + // Flush the region selectively. + desiredRegion.flushcache(true); + + long totalMemstoreSize; + long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize; + totalMemstoreSize = desiredRegion.getMemstoreSize().get(); + + // Find the sizes of the memstores of each CF. + cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize(); + + // CF1 Should have been flushed + assertEquals(0, cf1MemstoreSize); + // CF2 and CF3 shouldn't have been flushed. + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + assertEquals(totalMemstoreSize, cf2MemstoreSize + cf3MemstoreSize); + + // Wait for the RS report to go across to the master, so that the master + // is aware of which sequence ids have been flushed, before we kill the RS. + // If in production, the RS dies before the report goes across, we will + // safely replay all the edits. + Thread.sleep(2000); + + // Abort the region server where we have the region hosted. + HRegionServer rs = desiredRegionAndServer.getSecond(); + rs.abort("testing"); + + // The aborted region server's regions will be eventually assigned to some + // other region server, and the get RPC call (inside verifyEdit()) will + // retry for some time till the regions come back up. + + // Verify that all the edits are safe. + for (int i = 1; i <= 80; i++) { + verifyEdit(1, i, table); + if (i <= 10) { + verifyEdit(2, i, table); + verifyEdit(3, i, table); + } + } + + try { + TEST_UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.error("Could not shutdown the mini cluster. Terminating."); + e.printStackTrace(); + throw e; + } + } + + // Test Log Replay with Distributed Splitting on. + // In distributed log splitting, the log splitters ask the master for the + // last flushed sequence id for a region. This test would ensure that we + // are doing the book-keeping correctly. + @Test + public void testLogReplayWithDistributedSplitting() throws Exception { + TEST_UTIL.getConfiguration().setBoolean( + HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + testLogReplay(); + } + + /** + * When a log roll is about to happen, we do a flush of the regions who will + * be affected by the log roll. These flushes cannot be a selective flushes, + * otherwise we cannot roll the logs. This test ensures that we do a + * full-flush in that scenario. + * @throws IOException + */ + @Test + public void testFlushingWhenLogRolling() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setBoolean(HConstants.HREGION_MEMSTORE_PER_COLUMN_FAMILY_FLUSH, true); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300000); + conf.setLong(HConstants.HREGION_MEMSTORE_COLUMNFAMILY_FLUSH_SIZE, 100000); + + // Also, let us try real hard to get a log roll to happen. + // Keeping the log roll period to 2s. + conf.setLong("hbase.regionserver.logroll.period", 2000); + // Keep the block size small so that we fill up the log files very fast. + conf.setLong("hbase.regionserver.hlog.blocksize", 6144); + int maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); + + final int numRegionServers = 4; + try { + TEST_UTIL.startMiniCluster(numRegionServers); + } catch (Exception e) { + LOG.error("Could not start the mini cluster. Terminating."); + e.printStackTrace(); + throw e; + } + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + TEST_UTIL.createTable(TABLENAME, families); + HTable table = new HTable(conf, TABLENAME); + HTableDescriptor htd = table.getTableDescriptor(); + + for (byte [] family : families) { + if (!htd.hasFamily(family)) { + htd.addFamily(new HColumnDescriptor(family)); + } + } + + HRegion desiredRegion = getRegionWithName(TABLENAME).getFirst(); + assertTrue("Could not find a region which hosts the new region.", + desiredRegion != null); + + // Add some edits. Most will be for CF1, some for CF2 and CF3. + for (int i = 1; i <= 10000; i++) { + table.put(createPut(1, i)); + if (i <= 200) { + table.put(createPut(2, i)); + table.put(createPut(3, i)); + } + table.flushCommits(); + // Keep adding until we exceed the number of log files, so that we are + // able to trigger the cleaning of old log files. + int currentNumLogFiles = desiredRegion.getLog().getNumLogFiles(); + if (currentNumLogFiles > maxLogs) { + LOG.info("The number of log files is now: " + currentNumLogFiles + + ". Expect a log roll and memstore flush."); + break; + } + } + table.close(); + // Wait for some time till the flush caused by log rolling happens. + Thread.sleep(4000); + + // We have artificially created the conditions for a log roll. When a + // log roll happens, we should flush all the column families. Testing that + // case here. + + // Individual families should have been flushed. + assertEquals(0, desiredRegion.getStore(FAMILY1).getMemStoreSize()); + assertEquals(0, desiredRegion.getStore(FAMILY2).getMemStoreSize()); + assertEquals(0, desiredRegion.getStore(FAMILY3).getMemStoreSize()); + + // And of course, the total memstore should also be clean. + assertEquals(0, desiredRegion.getMemstoreSize().get()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 3794ab6..50e41f7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -790,7 +790,7 @@ public class TestMemStore extends TestCase { for (int newValue = 0; newValue < 1000; newValue++) { for (int row = newValue; row < newValue + 1000; row++) { byte[] rowBytes = Bytes.toBytes(row); - size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts); + size += memstore.updateColumnValue(rowBytes, FAMILY, qualifier, newValue, ++ts, -1L); } } System.out.println("Wrote " + ts + " vals"); @@ -912,7 +912,7 @@ public class TestMemStore extends TestCase { Store s = stores.entrySet().iterator().next().getValue(); edge.setCurrentTimeMillis(1234); - s.add(KeyValueTestUtil.create("r", "f", "q", 100, "v")); + s.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), -1L); edge.setCurrentTimeMillis(1234 + 100); assertTrue(region.shouldFlush() == false); edge.setCurrentTimeMillis(1234 + 10000); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 9efd887..04f0148 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -206,7 +206,7 @@ public class TestStore { long size = store.memstore.getFlushableSize(); Assert.assertEquals(0, size); LOG.info("Adding some data"); - long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); + long kvSize = store.add(new KeyValue(row, family, qf1, 1, (byte[])null), -1L); size = store.memstore.getFlushableSize(); Assert.assertEquals(kvSize, size); // Flush. Bug #1 from HBASE-10466. Make sure size calculation on failed flush is right. @@ -219,7 +219,7 @@ public class TestStore { } size = store.memstore.getFlushableSize(); Assert.assertEquals(kvSize, size); - store.add(new KeyValue(row, family, qf2, 2, (byte[])null)); + store.add(new KeyValue(row, family, qf2, 2, (byte[])null), -1L); // Even though we add a new kv, we expect the flushable size to be 'same' since we have // not yet cleared the snapshot -- the above flush failed. Assert.assertEquals(kvSize, size); @@ -291,9 +291,9 @@ public class TestStore { for (int i = 1; i <= storeFileNum; i++) { LOG.info("Adding some data for the store file #" + i); timeStamp = EnvironmentEdgeManager.currentTimeMillis(); - this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null)); - this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null)); - this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null)); + this.store.add(new KeyValue(row, family, qf1, timeStamp, (byte[]) null), -1L); + this.store.add(new KeyValue(row, family, qf2, timeStamp, (byte[]) null), -1L); + this.store.add(new KeyValue(row, family, qf3, timeStamp, (byte[]) null), -1L); flush(i); edge.incrementTime(sleepTime); } @@ -334,9 +334,9 @@ public class TestStore { int storeFileNum = 4; for (int i = 1; i <= storeFileNum; i++) { LOG.info("Adding some data for the store file #"+i); - this.store.add(new KeyValue(row, family, qf1, i, (byte[])null)); - this.store.add(new KeyValue(row, family, qf2, i, (byte[])null)); - this.store.add(new KeyValue(row, family, qf3, i, (byte[])null)); + this.store.add(new KeyValue(row, family, qf1, i, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf2, i, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf3, i, (byte[])null), -1L); flush(i); } // after flush; check the lowest time stamp @@ -387,8 +387,8 @@ public class TestStore { public void testEmptyStoreFile() throws IOException { init(this.name.getMethodName()); // Write a store file. - this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), -1L); flush(1); // Now put in place an empty store file. Its a little tricky. Have to // do manually with hacked in sequence id. @@ -425,12 +425,12 @@ public class TestStore { init(this.name.getMethodName()); //Put data in memstore - this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), -1L); //Get result = HBaseTestingUtility.getFromStoreFile(store, @@ -449,20 +449,20 @@ public class TestStore { init(this.name.getMethodName()); //Put data in memstore - this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), -1L); //flush flush(1); //Add more data - this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), -1L); //flush flush(2); //Add more data - this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), -1L); //flush flush(3); @@ -488,20 +488,20 @@ public class TestStore { init(this.name.getMethodName()); //Put data in memstore - this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf1, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf2, 1, (byte[])null), -1L); //flush flush(1); //Add more data - this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf3, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf4, 1, (byte[])null), -1L); //flush flush(2); //Add more data - this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null)); - this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null)); + this.store.add(new KeyValue(row, family, qf5, 1, (byte[])null), -1L); + this.store.add(new KeyValue(row, family, qf6, 1, (byte[])null), -1L); //Get result = HBaseTestingUtility.getFromStoreFile(store, @@ -543,7 +543,8 @@ public class TestStore { long newValue = 3L; this.store.add(new KeyValue(row, family, qf1, System.currentTimeMillis(), - Bytes.toBytes(oldValue))); + Bytes.toBytes(oldValue)), + -1L); // snapshot the store. this.store.snapshot(); @@ -551,10 +552,10 @@ public class TestStore { // add other things: this.store.add(new KeyValue(row, family, qf2, System.currentTimeMillis(), - Bytes.toBytes(oldValue))); + Bytes.toBytes(oldValue)), -1L); // update during the snapshot. - long ret = this.store.updateColumnValue(row, family, qf1, newValue); + long ret = this.store.updateColumnValue(row, family, qf1, newValue, -1L); // memstore should have grown by some amount. Assert.assertTrue(ret > 0); @@ -602,26 +603,26 @@ public class TestStore { size += this.store.add(new KeyValue(Bytes.toBytes("200909091000"), family, qf1, System.currentTimeMillis(), - Bytes.toBytes(newValue))); + Bytes.toBytes(newValue)), -1L); size += this.store.add(new KeyValue(Bytes.toBytes("200909091200"), family, qf1, System.currentTimeMillis(), - Bytes.toBytes(newValue))); + Bytes.toBytes(newValue)), -1L); size += this.store.add(new KeyValue(Bytes.toBytes("200909091300"), family, qf1, System.currentTimeMillis(), - Bytes.toBytes(newValue))); + Bytes.toBytes(newValue)), -1L); size += this.store.add(new KeyValue(Bytes.toBytes("200909091400"), family, qf1, System.currentTimeMillis(), - Bytes.toBytes(newValue))); + Bytes.toBytes(newValue)), -1L); size += this.store.add(new KeyValue(Bytes.toBytes("200909091500"), family, qf1, System.currentTimeMillis(), - Bytes.toBytes(newValue))); + Bytes.toBytes(newValue)), -1L); for ( int i = 0 ; i < 10000 ; ++i) { newValue++; - long ret = this.store.updateColumnValue(row, family, qf1, newValue); - long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue); + long ret = this.store.updateColumnValue(row, family, qf1, newValue, -1L); + long ret2 = this.store.updateColumnValue(row2, family, qf1, newValue, -1L); if (ret != 0) System.out.println("ret: " + ret); if (ret2 != 0) System.out.println("ret2: " + ret2); @@ -655,13 +656,13 @@ public class TestStore { long newValue = 3L; this.store.add(new KeyValue(row, family, qf1, EnvironmentEdgeManager.currentTimeMillis(), - Bytes.toBytes(oldValue))); + Bytes.toBytes(oldValue)), -1L); // snapshot the store. this.store.snapshot(); // update during the snapshot, the exact same TS as the Put (lololol) - long ret = this.store.updateColumnValue(row, family, qf1, newValue); + long ret = this.store.updateColumnValue(row, family, qf1, newValue, -1L); // memstore should have grown by some amount. Assert.assertTrue(ret > 0); @@ -673,11 +674,11 @@ public class TestStore { // now increment again: newValue += 1; - this.store.updateColumnValue(row, family, qf1, newValue); + this.store.updateColumnValue(row, family, qf1, newValue, -1L); // at this point we have a TS=1 in snapshot, and a TS=2 in kvset, so increment again: newValue += 1; - this.store.updateColumnValue(row, family, qf1, newValue); + this.store.updateColumnValue(row, family, qf1, newValue, -1L); // the second TS should be TS=2 or higher., even though 'time=1' right now. @@ -700,7 +701,7 @@ public class TestStore { mee.setValue(2); // time goes up slightly newValue += 1; - this.store.updateColumnValue(row, family, qf1, newValue); + this.store.updateColumnValue(row, family, qf1, newValue, -1L); results = HBaseTestingUtility.getFromStoreFile(store, get); Assert.assertEquals(2, results.size()); @@ -733,9 +734,9 @@ public class TestStore { init(name.getMethodName(), conf); LOG.info("Adding some data"); - store.add(new KeyValue(row, family, qf1, 1, (byte[])null)); - store.add(new KeyValue(row, family, qf2, 1, (byte[])null)); - store.add(new KeyValue(row, family, qf3, 1, (byte[])null)); + store.add(new KeyValue(row, family, qf1, 1, (byte[])null), -1L); + store.add(new KeyValue(row, family, qf2, 1, (byte[])null), -1L); + store.add(new KeyValue(row, family, qf3, 1, (byte[])null), -1L); LOG.info("Before flush, we should have no files"); @@ -865,7 +866,7 @@ public class TestStore { List kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family); for (Cell kv : kvList1) { - this.store.add(KeyValueUtil.ensureKeyValue(kv)); + this.store.add(KeyValueUtil.ensureKeyValue(kv), -1L); } this.store.snapshot(); @@ -873,7 +874,7 @@ public class TestStore { List kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family); for(Cell kv : kvList2) { - this.store.add(KeyValueUtil.ensureKeyValue(kv)); + this.store.add(KeyValueUtil.ensureKeyValue(kv), -1L); } List result; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyHLog.java index 10ba82f..899f737 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyHLog.java @@ -57,7 +57,7 @@ public class FaultyHLog extends FSHLog { super.sync(txid); } @Override - public long appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, + public TxidAndSeqNum appendNoSync(HRegionInfo info, TableName tableName, WALEdit edits, List clusterIds, final long now, HTableDescriptor htd, AtomicLong sequenceId, boolean isInMemstore, long nonceGroup, long nonce) throws IOException { if (this.ft == FailureType.APPENDNOSYNC) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 18c130b..28a4638 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -503,7 +503,7 @@ public class TestHLog { htd.addFamily(new HColumnDescriptor("column")); log.append(info, tableName, cols, System.currentTimeMillis(), htd, sequenceId); - log.startCacheFlush(info.getEncodedNameAsBytes()); + log.startCacheFlush(info.getEncodedNameAsBytes(), Long.MAX_VALUE, Long.MAX_VALUE, sequenceId); log.completeCacheFlush(info.getEncodedNameAsBytes()); log.close(); Path filename = ((FSHLog) log).computeFilename(); @@ -561,7 +561,7 @@ public class TestHLog { HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor("column")); log.append(hri, tableName, cols, System.currentTimeMillis(), htd, sequenceId); - log.startCacheFlush(hri.getEncodedNameAsBytes()); + log.startCacheFlush(hri.getEncodedNameAsBytes(), Long.MAX_VALUE, Long.MAX_VALUE, sequenceId); log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.close(); Path filename = ((FSHLog) log).computeFilename(); @@ -670,7 +670,7 @@ public class TestHLog { // Flush the first region, we expect to see the first two files getting // archived. We need to append something or writer won't be rolled. addEdits(log, hri2, tableName2, 1, sequenceId); - log.startCacheFlush(hri.getEncodedNameAsBytes()); + log.startCacheFlush(hri.getEncodedNameAsBytes(), Long.MAX_VALUE, Long.MAX_VALUE, sequenceId); log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.rollWriter(); assertEquals(2, ((FSHLog) log).getNumRolledLogFiles()); @@ -679,7 +679,7 @@ public class TestHLog { // since the oldest was completely flushed and the two others only contain // flush information addEdits(log, hri2, tableName2, 1, sequenceId); - log.startCacheFlush(hri2.getEncodedNameAsBytes()); + log.startCacheFlush(hri2.getEncodedNameAsBytes(), Long.MAX_VALUE, Long.MAX_VALUE, sequenceId); log.completeCacheFlush(hri2.getEncodedNameAsBytes()); log.rollWriter(); assertEquals(0, ((FSHLog) log).getNumRolledLogFiles()); @@ -1016,7 +1016,7 @@ public class TestHLog { assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles()); // add a waledit to table1, and flush the region. addEdits(hlog, hri1, table1, 3, sequenceId1); - flushRegion(hlog, hri1.getEncodedNameAsBytes()); + flushRegion(hlog, hri1.getEncodedNameAsBytes(), sequenceId1); // roll log; all old logs should be archived. hlog.rollWriter(); assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles()); @@ -1030,7 +1030,7 @@ public class TestHLog { assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles()); // add edits for table2, and flush hri1. addEdits(hlog, hri2, table2, 2, sequenceId2); - flushRegion(hlog, hri1.getEncodedNameAsBytes()); + flushRegion(hlog, hri1.getEncodedNameAsBytes(), sequenceId1); // the log : region-sequenceId map is // log1: region2 (unflushed) // log2: region1 (flushed) @@ -1040,7 +1040,7 @@ public class TestHLog { assertEquals(2, ((FSHLog) hlog).getNumRolledLogFiles()); // flush region2, and all logs should be archived. addEdits(hlog, hri2, table2, 2, sequenceId2); - flushRegion(hlog, hri2.getEncodedNameAsBytes()); + flushRegion(hlog, hri2.getEncodedNameAsBytes(), sequenceId2); hlog.rollWriter(); assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles()); } finally { @@ -1093,12 +1093,12 @@ public class TestHLog { assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]); // flush region 1, and roll the wal file. Only last wal which has entries for region1 should // remain. - flushRegion(hlog, hri1.getEncodedNameAsBytes()); + flushRegion(hlog, hri1.getEncodedNameAsBytes(), sequenceId1); hlog.rollWriter(); // only one wal should remain now (that is for the second region). assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles()); // flush the second region - flushRegion(hlog, hri2.getEncodedNameAsBytes()); + flushRegion(hlog, hri2.getEncodedNameAsBytes(), sequenceId2); hlog.rollWriter(true); // no wal should remain now. assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles()); @@ -1115,14 +1115,14 @@ public class TestHLog { regionsToFlush = ((FSHLog) hlog).findRegionsToForceFlush(); assertEquals(2, regionsToFlush.length); // flush both regions - flushRegion(hlog, hri1.getEncodedNameAsBytes()); - flushRegion(hlog, hri2.getEncodedNameAsBytes()); + flushRegion(hlog, hri1.getEncodedNameAsBytes(), sequenceId1); + flushRegion(hlog, hri2.getEncodedNameAsBytes(), sequenceId2); hlog.rollWriter(true); assertEquals(0, ((FSHLog) hlog).getNumRolledLogFiles()); // Add an edit to region1, and roll the wal. addEdits(hlog, hri1, t1, 2, sequenceId1); // tests partial flush: roll on a partial flush, and ensure that wal is not archived. - hlog.startCacheFlush(hri1.getEncodedNameAsBytes()); + hlog.startCacheFlush(hri1.getEncodedNameAsBytes(), Long.MAX_VALUE, Long.MAX_VALUE, sequenceId1); hlog.rollWriter(); hlog.completeCacheFlush(hri1.getEncodedNameAsBytes()); assertEquals(1, ((FSHLog) hlog).getNumRolledLogFiles()); @@ -1183,8 +1183,8 @@ public class TestHLog { * @param hlog * @param regionEncodedName */ - private void flushRegion(HLog hlog, byte[] regionEncodedName) { - hlog.startCacheFlush(regionEncodedName); + private void flushRegion(HLog hlog, byte[] regionEncodedName, AtomicLong sequenceId) { + hlog.startCacheFlush(regionEncodedName, Long.MAX_VALUE, Long.MAX_VALUE, sequenceId); hlog.completeCacheFlush(regionEncodedName); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 5b51eca..95e5a07 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -526,8 +526,8 @@ public class TestWALReplay { final AtomicInteger countOfRestoredEdits = new AtomicInteger(0); HRegion region3 = new HRegion(basedir, wal3, newFS, newConf, hri, htd, null) { @Override - protected boolean restoreEdit(Store s, KeyValue kv) { - boolean b = super.restoreEdit(s, kv); + protected boolean restoreEdit(Store s, KeyValue kv, long seqNum) { + boolean b = super.restoreEdit(s, kv, seqNum); countOfRestoredEdits.incrementAndGet(); return b; } @@ -776,7 +776,7 @@ public class TestWALReplay { } // Add a cache flush, shouldn't have any effect - wal.startCacheFlush(regionName); + wal.startCacheFlush(regionName, Long.MAX_VALUE, Long.MAX_VALUE, sequenceId); wal.completeCacheFlush(regionName); // Add an edit to another family, should be skipped. @@ -931,9 +931,9 @@ public class TestWALReplay { private HRegion r; @Override - public void requestFlush(HRegion region) { + public void requestFlush(HRegion region, boolean selectiveFlushRequest) { try { - r.flushcache(); + r.flushcache(selectiveFlushRequest); } catch (IOException e) { throw new RuntimeException("Exception flushing", e); } -- 1.9.1