commit bf10597856516d4e5d91a30d0e5b69e8be3ac585 Author: stack Date: Tue Jun 2 15:56:41 2015 -0700 HBASE-13811 Splitting WALs, we are filtering out too many edits -> DATALOSS modified: hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java Fixed logging to not include regioninfo (no other logs in here do) and change references to SSH to ServerCrashProcedure modified: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java Tighten up the log message; too verbose modified: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tighter log messages. Making two line messages, single line. Log encoded name instead of full name. modified: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java Attempt at better explaining what is going on around a flush. Gave core Maps more descriptive names. Put all accesses under regionSequenceIdLock. Actual fix is changing getEarliestMemstoreSeqNum to look in flushing Map first before looking in unflushed. modified: hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java Logging cleanup. Log less but with more density. Added info to help debug diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java index 556a143..0ae3a00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java @@ -645,7 +645,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements lastSequenceId = lastRecordedFlushedSequenceId; } ZKUtil.createSetData(this.watcher, nodePath, - ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null)); + ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null)); if (LOG.isDebugEnabled()) { LOG.debug("Marked " + regionEncodeName + " recovering from " + serverName + ": " + nodePath); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index 58a8260..c658475 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -453,7 +453,7 @@ public class RegionStates { ServerName oldServerName = regionAssignments.put(hri, serverName); if (!serverName.equals(oldServerName)) { if (LOG.isDebugEnabled()) { - LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName + " " + hri); + LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName); } else { LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName); } @@ -644,7 +644,7 @@ public class RegionStates { // Region is open on this region server, but in transition. // This region must be moving away from this server, or splitting/merging. // SSH will handle it, either skip assigning, or re-assign. - LOG.info("Transitioning " + state + " will be handled by SSH for " + sn); + LOG.info("Transitioning " + state + " will be handled by ServerCrashProcedure for " + sn); } else if (sn.equals(state.getServerName())) { // Region is in transition on this region server, and this // region is not open on this server. So the region must be @@ -654,7 +654,8 @@ public class RegionStates { // transition. The region could be in failed_close state too if we have // tried several times to open it while this region server is not reachable) if (state.isPendingOpenOrOpening() || state.isFailedClose() || state.isOffline()) { - LOG.info("Found region in " + state + " to be reassigned by SSH for " + sn); + LOG.info("Found region in " + state + + " to be reassigned by ServerCrashProcedure for " + sn); rits.add(hri); } else if(state.isSplittingNew()) { regionsToCleanIfNoMetaEntry.add(state.getRegion()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java index 7e0e54c..328e890 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java @@ -76,9 +76,9 @@ public class FlushLargeStoresPolicy extends FlushPolicy { private boolean shouldFlush(Store store) { if (store.getMemStoreSize() > this.flushSizeLowerBound) { if (LOG.isDebugEnabled()) { - LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + region - + " will be flushed because of memstoreSize(" + store.getMemStoreSize() - + ") is larger than lower bound(" + this.flushSizeLowerBound + ")"); + LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " + + region.getRegionInfo().getEncodedName() + " because memstoreSize=" + + store.getMemStoreSize() + " > lower bound=" + this.flushSizeLowerBound); } return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 94a193f..ca809a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1912,27 +1912,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * returns true which will make a lot of flush requests. */ boolean shouldFlushStore(Store store) { - long maxFlushedSeqId = - this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), store - .getFamily().getName()) - 1; - if (maxFlushedSeqId > 0 && maxFlushedSeqId + flushPerChanges < sequenceId.get()) { + long unflushed = this.wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), + store.getFamily().getName()) - 1; + if (unflushed > 0 && unflushed + flushPerChanges < sequenceId.get()) { if (LOG.isDebugEnabled()) { - LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this - + " will be flushed because its max flushed seqId(" + maxFlushedSeqId - + ") is far away from current(" + sequenceId.get() + "), max allowed is " - + flushPerChanges); + LOG.debug("Flush column family " + store.getColumnFamilyName() + " of " + + getRegionInfo().getEncodedName() + " because unflushed sequenceid=" + unflushed + + " is > " + this.flushPerChanges + " from current=" + sequenceId.get()); } return true; } - if (flushCheckInterval <= 0) { + if (this.flushCheckInterval <= 0) { return false; } long now = EnvironmentEdgeManager.currentTime(); - if (store.timeOfOldestEdit() < now - flushCheckInterval) { + if (store.timeOfOldestEdit() < now - this.flushCheckInterval) { if (LOG.isDebugEnabled()) { - LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + this - + " will be flushed because time of its oldest edit (" + store.timeOfOldestEdit() - + ") is far away from now(" + now + "), max allowed is " + flushCheckInterval); + LOG.debug("Flush column family: " + store.getColumnFamilyName() + " of " + + getRegionInfo().getEncodedName() + " because time of oldest edit=" + + store.timeOfOldestEdit() + " is > " + this.flushCheckInterval + " from now =" + now); } return true; } @@ -2086,18 +2084,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } if (LOG.isInfoEnabled()) { - LOG.info("Started memstore flush for " + this + ", current region memstore size " - + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/" - + stores.size() + " column families' memstores are being flushed." - + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid)); - // only log when we are not flushing all stores. + // Log a fat line detailing what is being flushed. + StringBuffer perCfExtras = null; if (this.stores.size() > storesToFlush.size()) { + perCfExtras = new StringBuffer(); for (Store store: storesToFlush) { - LOG.info("Flushing Column Family: " + store.getColumnFamilyName() - + " which was occupying " - + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore."); + perCfExtras.append("; "); + perCfExtras.append(store.getColumnFamilyName()); + perCfExtras.append("="); + perCfExtras.append(StringUtils.byteDesc(store.getMemStoreSize())); } } + LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + + " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) + + ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") + + ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + myseqid)); } // Stop updates while we snapshot the memstore of all of these regions' stores. We only have // to do this for a moment. It is quick. We also set the memstore size to zero here before we @@ -3704,7 +3705,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // Make request outside of synchronize block; HBASE-818. this.rsServices.getFlushRequester().requestFlush(this, false); if (LOG.isDebugEnabled()) { - LOG.debug("Flush requested on " + this); + LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName()); } } @@ -7840,8 +7841,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public long getOldestSeqIdOfStore(byte[] familyName) { - return wal.getEarliestMemstoreSeqNum(getRegionInfo() - .getEncodedNameAsBytes(), familyName); + return wal.getEarliestMemstoreSeqNum(getRegionInfo().getEncodedNameAsBytes(), familyName); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 98dab42..0c6b2f0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -111,16 +111,6 @@ import com.google.common.collect.Sets; * services is compaction services where files are aggregated once they pass * a configurable threshold. * - *

The only thing having to do with logs that Store needs to deal with is - * the reconstructionLog. This is a segment of an HRegion's log that might - * NOT be present upon startup. If the param is NULL, there's nothing to do. - * If the param is non-NULL, we need to process the log to reconstruct - * a TreeMap that might not have been written to disk before the process - * died. - * - *

It's assumed that after this constructor returns, the reconstructionLog - * file will be deleted (by whoever has instantiated the Store). - * *

Locking and transactions are handled at a higher level. This API should * not be called directly but by an HRegion manager. */ @@ -898,8 +888,7 @@ public class HStore implements Store { } /** - * Write out current snapshot. Presumes {@link #snapshot()} has been called - * previously. + * Write out current snapshot. Presumes {@link #snapshot()} has been called previously. * @param logCacheFlushId flush sequence number * @param snapshot * @param status diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 8910042..274f09a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; +import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Message; import com.google.protobuf.RpcController; import com.google.protobuf.Service; @@ -127,6 +128,7 @@ public interface Region extends ConfigurationObserver { long getMaxFlushedSeqId(); /** @return the oldest sequence id found in the store for the given family */ + @VisibleForTesting public long getOldestSeqIdOfStore(byte[] familyName); /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index aa722a0..ca935dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -66,7 +66,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; @@ -334,38 +333,58 @@ public class FSHLog implements WAL { private final AtomicInteger closeErrorCount = new AtomicInteger(); - // Region sequence id accounting across flushes and for knowing when we can GC a WAL. These - // sequence id numbers are by region and unrelated to the ring buffer sequence number accounting - // done above in failedSequence, highest sequence, etc. + // Region sequence id accounting across flushes and for knowing when we can GC a WAL. + /** * This lock ties all operations on lowestFlushingStoreSequenceIds and - * oldestUnflushedStoreSequenceIds Maps with the exception of append's putIfAbsent call into - * oldestUnflushedStoreSequenceIds. We use these Maps to find out the low bound regions - * sequence id, or to find regions with old sequence ids to force flush; we are interested in - * old stuff not the new additions (TODO: IS THIS SAFE? CHECK!). + * oldestUnflushedStoreSequenceIds Maps. {@link #unflushedOldestStoreSequenceIds} has the + * oldest outstanding sequence ids EXCEPT when we are flushing. When we flush, the current + * oldest set are moved (atomically because of this lock) to + * {@link #flushingOldestStoreSequenceIds}. To find oldest sequence id then, you need to look + * first in {@link #flushingOldestStoreSequenceIds} in case there is an ongoing flush, and then + * look in {@link #unflushedOldestStoreSequenceIds}. + * + *

The two Maps are tied by this locking object except when we call + * {@link FSHLog#append(HTableDescriptor, HRegionInfo, WALKey, WALEdit, AtomicLong, boolean, + * List)}. In here is a putIfAbsent call on {@link #unflushedOldestStoreSequenceIds}. In this + * latter case, the current append edit will add its sequence id if it finds that there is no + * entry for the current column family. There will be no entry only if we just came up OR we have + * moved aside current set of oldest sequence ids because the current set are being flushed + * (by putting them into {@link #flushingOldestStoreSequenceIds}). This is how we get the + * 'oldest' sequence id per region per column family. */ private final Object regionSequenceIdLock = new Object(); /** * Map of encoded region names and family names to their OLDEST -- i.e. their first, * the longest-lived -- sequence id in memstore. Note that this sequence id is the region - * sequence id. This is not related to the id we use above for {@link #highestSyncedSequence} + * sequence id. + * + *

When we flush, the current oldest sequenceids get cleared and added to + * {@link #flushingOldestStoreSequenceIds}. The next append that comes in, is then added + * here to {@link #unflushedOldestStoreSequenceIds} as the next oldest sequenceid. + * + *

If flush fails, currently server is aborted so no need to restore old sequenceids. + * + *

This sequenceid is not related to the id we use above for {@link #highestSyncedSequence} * and {@link #highestUnsyncedSequence} which is the sequence from the disruptor * ring buffer. */ - private final ConcurrentMap> oldestUnflushedStoreSequenceIds + private final ConcurrentMap> unflushedOldestStoreSequenceIds = new ConcurrentSkipListMap>( Bytes.BYTES_COMPARATOR); /** * Map of encoded region names and family names to their lowest or OLDEST sequence/edit id in * memstore currently being flushed out to hfiles. Entries are moved here from - * {@link #oldestUnflushedStoreSequenceIds} while the lock {@link #regionSequenceIdLock} is held - * (so movement between the Maps is atomic). This is not related to the id we use above for + * {@link #unflushedOldestStoreSequenceIds} while the lock {@link #regionSequenceIdLock} is held + * (so movement between the Maps is atomic). + * + *

This is not related to the id we use above for * {@link #highestSyncedSequence} and {@link #highestUnsyncedSequence} which is the sequence from * the disruptor ring buffer, an internal detail. */ - private final Map> lowestFlushingStoreSequenceIds = + private final Map> flushingOldestStoreSequenceIds = new TreeMap>(Bytes.BYTES_COMPARATOR); /** @@ -745,26 +764,36 @@ public class FSHLog implements WAL { return DefaultWALProvider.createWriter(conf, fs, path, false); } - private long getLowestSeqId(Map seqIdMap) { - long result = HConstants.NO_SEQNUM; - for (Long seqNum: seqIdMap.values()) { - if (result == HConstants.NO_SEQNUM || seqNum.longValue() < result) { - result = seqNum.longValue(); + /** + * @param sequenceids Map to search for lowest value. + * @return Lowest value found in sequenceids. + */ + private long getLowestSequenceId(Map sequenceids) { + long lowest = HConstants.NO_SEQNUM; + for (Long sid: sequenceids.values()) { + if (lowest == HConstants.NO_SEQNUM || sid.longValue() < lowest) { + lowest = sid.longValue(); } } - return result; + return lowest; } - private > Map copyMapWithLowestSeqId( - Map mapToCopy) { - Map copied = Maps.newHashMap(); - for (Map.Entry entry: mapToCopy.entrySet()) { - long lowestSeqId = getLowestSeqId(entry.getValue()); + /** + * @param src + * @return New Map that has same keys as src but instead of a Map value, it instead + * has found in the Map value, the smallest sequence id and it returns that as the value + * instead. + */ + private > Map getLowestSequenceIdMap( + Map src) { + Map tgt = Maps.newHashMap(); + for (Map.Entry entry: src.entrySet()) { + long lowestSeqId = getLowestSequenceId(entry.getValue()); if (lowestSeqId != HConstants.NO_SEQNUM) { - copied.put(entry.getKey(), lowestSeqId); + tgt.put(entry.getKey(), lowestSeqId); } } - return copied; + return tgt; } /** @@ -779,23 +808,21 @@ public class FSHLog implements WAL { * @throws IOException */ private void cleanOldLogs() throws IOException { - Map lowestFlushingRegionSequenceIdsLocal = null; - Map oldestUnflushedRegionSequenceIdsLocal = null; + Map flushing = null; + Map unflushed = null; List logsToArchive = new ArrayList(); // make a local copy so as to avoid locking when we iterate over these maps. - synchronized (regionSequenceIdLock) { - lowestFlushingRegionSequenceIdsLocal = - copyMapWithLowestSeqId(this.lowestFlushingStoreSequenceIds); - oldestUnflushedRegionSequenceIdsLocal = - copyMapWithLowestSeqId(this.oldestUnflushedStoreSequenceIds); + synchronized (this.regionSequenceIdLock) { + flushing = getLowestSequenceIdMap(this.flushingOldestStoreSequenceIds); + unflushed = getLowestSequenceIdMap(this.unflushedOldestStoreSequenceIds); } for (Map.Entry> e : byWalRegionSequenceIds.entrySet()) { // iterate over the log file. Path log = e.getKey(); Map sequenceNums = e.getValue(); // iterate over the map for this log file, and tell whether it should be archive or not. - if (areAllRegionsFlushed(sequenceNums, lowestFlushingRegionSequenceIdsLocal, - oldestUnflushedRegionSequenceIdsLocal)) { + if (areAllRegionsFlushed(sequenceNums, flushing, + unflushed)) { logsToArchive.add(log); LOG.debug("WAL file ready for archiving " + log); } @@ -850,7 +877,9 @@ public class FSHLog implements WAL { // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock. synchronized (regionSequenceIdLock) { for (Map.Entry e: regionsSequenceNums.entrySet()) { - long unFlushedVal = getEarliestMemstoreSeqNum(e.getKey()); + Map m = this.unflushedOldestStoreSequenceIds.get(e.getKey()); + if (m == null) continue; + long unFlushedVal = getLowestSequenceId(m); if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) { if (regionsToFlush == null) regionsToFlush = new ArrayList(); @@ -858,8 +887,8 @@ public class FSHLog implements WAL { } } } - return regionsToFlush == null ? null : regionsToFlush - .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY }); + return regionsToFlush == null? null: + regionsToFlush.toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY }); } /** @@ -1631,36 +1660,36 @@ public class FSHLog implements WAL { } @Override - public boolean startCacheFlush(final byte[] encodedRegionName, - Set flushedFamilyNames) { - Map oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + public boolean startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames) { if (!closeBarrier.beginOp()) { - LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) + - " - because the server is closing."); + LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing."); return false; } + Map oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); synchronized (regionSequenceIdLock) { - ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = - oldestUnflushedStoreSequenceIds.get(encodedRegionName); - if (oldestUnflushedStoreSequenceIdsOfRegion != null) { + ConcurrentMap m = this.unflushedOldestStoreSequenceIds.get(encodedRegionName); + if (m != null) { + // NOTE: We can only remove if updates are blocked by write lock (they are in the caller). + // See in append where it will add itself to the unflushed Map if it finds no entry for its + // family. for (byte[] familyName: flushedFamilyNames) { - Long seqId = oldestUnflushedStoreSequenceIdsOfRegion.remove(familyName); - if (seqId != null) { - oldStoreSeqNum.put(familyName, seqId); - } + Long seqId = m.remove(familyName); + if (seqId != null) oldStoreSeqNum.put(familyName, seqId); } if (!oldStoreSeqNum.isEmpty()) { - Map oldValue = this.lowestFlushingStoreSequenceIds.put( + Map oldValue = this.flushingOldestStoreSequenceIds.put( encodedRegionName, oldStoreSeqNum); - assert oldValue == null: "Flushing map not cleaned up for " - + Bytes.toString(encodedRegionName); + if (oldValue != null) { + LOG.warn("Flushing Map not clean up for " + Bytes.toString(encodedRegionName) + + ", sequenceid=" + oldStoreSeqNum); + } } - if (oldestUnflushedStoreSequenceIdsOfRegion.isEmpty()) { + if (m.isEmpty()) { // Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever // even if the region is already moved to other server. // Do not worry about data racing, we held write lock of region when calling // startCacheFlush, so no one can add value to the map we removed. - oldestUnflushedStoreSequenceIds.remove(encodedRegionName); + this.unflushedOldestStoreSequenceIds.remove(encodedRegionName); } } } @@ -1670,8 +1699,7 @@ public class FSHLog implements WAL { // were no appends after last flush, so why are we starting flush? Maybe we should // assert not empty. Less rigorous, but safer, alternative is telling the caller to stop. // For now preserve old logic. - LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: [" - + Bytes.toString(encodedRegionName) + "]"); + LOG.warn("Couldn't find oldest sequenceid for " + Bytes.toString(encodedRegionName)); } return true; } @@ -1679,55 +1707,52 @@ public class FSHLog implements WAL { @Override public void completeCacheFlush(final byte [] encodedRegionName) { synchronized (regionSequenceIdLock) { - this.lowestFlushingStoreSequenceIds.remove(encodedRegionName); + this.flushingOldestStoreSequenceIds.remove(encodedRegionName); } closeBarrier.endOp(); } private ConcurrentMap getOrCreateOldestUnflushedStoreSequenceIdsOfRegion( byte[] encodedRegionName) { - ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = - oldestUnflushedStoreSequenceIds.get(encodedRegionName); - if (oldestUnflushedStoreSequenceIdsOfRegion != null) { - return oldestUnflushedStoreSequenceIdsOfRegion; - } - oldestUnflushedStoreSequenceIdsOfRegion = - new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + // Intentionally, this access is done outside of this.regionSequenceIdLock. Done per append. + ConcurrentMap m = unflushedOldestStoreSequenceIds.get(encodedRegionName); + if (m != null) return m; + m = new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); ConcurrentMap alreadyPut = - oldestUnflushedStoreSequenceIds.putIfAbsent(encodedRegionName, - oldestUnflushedStoreSequenceIdsOfRegion); - return alreadyPut == null ? oldestUnflushedStoreSequenceIdsOfRegion : alreadyPut; + this.unflushedOldestStoreSequenceIds.putIfAbsent(encodedRegionName, m); + return alreadyPut == null? m : alreadyPut; } @Override public void abortCacheFlush(byte[] encodedRegionName) { - Map storeSeqNumsBeforeFlushStarts; - Map currentStoreSeqNums = new TreeMap(Bytes.BYTES_COMPARATOR); + // Method is called when we are crashing down because failed write flush AND it is called + // if we fail prepare. The below is for the fail prepare case; we restore the old sequence ids. + Map flushing = null; + Map tmpMap = new TreeMap(Bytes.BYTES_COMPARATOR); + // Here we are moving sequenceids from flushing back to unflushed; doing opposite of what + // happened in startCacheFlush. During prepare phase, we have update lock on the region so + // no edits should be coming in via append. synchronized (regionSequenceIdLock) { - storeSeqNumsBeforeFlushStarts = this.lowestFlushingStoreSequenceIds.remove( - encodedRegionName); - if (storeSeqNumsBeforeFlushStarts != null) { - ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = - getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName); - for (Map.Entry familyNameAndSeqId: storeSeqNumsBeforeFlushStarts - .entrySet()) { - currentStoreSeqNums.put(familyNameAndSeqId.getKey(), - oldestUnflushedStoreSequenceIdsOfRegion.put(familyNameAndSeqId.getKey(), - familyNameAndSeqId.getValue())); + flushing = this.flushingOldestStoreSequenceIds.remove(encodedRegionName); + if (flushing != null) { + Map unflushed = + getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName); + for (Map.Entry e: flushing.entrySet()) { + // Set into unflushed the 'old' oldest sequenceid and if any value in flushed with this + // value, it will now be in tmpMap. + tmpMap.put(e.getKey(), unflushed.put(e.getKey(), e.getValue())); } } } closeBarrier.endOp(); - if (storeSeqNumsBeforeFlushStarts != null) { - for (Map.Entry familyNameAndSeqId : storeSeqNumsBeforeFlushStarts.entrySet()) { - Long currentSeqNum = currentStoreSeqNums.get(familyNameAndSeqId.getKey()); - if (currentSeqNum != null - && currentSeqNum.longValue() <= familyNameAndSeqId.getValue().longValue()) { - String errorStr = - "Region " + Bytes.toString(encodedRegionName) + " family " - + Bytes.toString(familyNameAndSeqId.getKey()) - + " acquired edits out of order current memstore seq=" + currentSeqNum - + ", previous oldest unflushed id=" + familyNameAndSeqId.getValue(); + // Here we are doing some 'test' to see if edits are going in out of order. What is it for? + if (flushing != null) { + for (Map.Entry e : flushing.entrySet()) { + Long currentId = tmpMap.get(e.getKey()); + if (currentId != null && currentId.longValue() <= e.getValue().longValue()) { + String errorStr = Bytes.toString(encodedRegionName) + " family " + + Bytes.toString(e.getKey()) + " acquired edits out of order current memstore seq=" + + currentId + ", previous oldest unflushed id=" + e.getValue(); LOG.error(errorStr); Runtime.getRuntime().halt(1); } @@ -1762,23 +1787,40 @@ public class FSHLog implements WAL { @Override public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { - ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = - this.oldestUnflushedStoreSequenceIds.get(encodedRegionName); - return oldestUnflushedStoreSequenceIdsOfRegion != null ? - getLowestSeqId(oldestUnflushedStoreSequenceIdsOfRegion) : HConstants.NO_SEQNUM; + // This method is called from two places; when we are preparing to flush and when we are + // reporting oldest sequenceid for a region we are closing (it is also used by tests). In both + // runtime cases, there should be no entry in flushingOldestStoreSequenceIds but to be + // 'correct', we will check it first. + synchronized (this.regionSequenceIdLock) { + Map m = this.flushingOldestStoreSequenceIds.get(encodedRegionName); + if (m != null) return getLowestSequenceId(m); + // If no current flush going on, then go to the unflushed Map. + m = this.unflushedOldestStoreSequenceIds.get(encodedRegionName); + return m != null? getLowestSequenceId(m): HConstants.NO_SEQNUM; + } } @Override - public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, - byte[] familyName) { - ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = - this.oldestUnflushedStoreSequenceIds.get(encodedRegionName); - if (oldestUnflushedStoreSequenceIdsOfRegion != null) { - Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName); - return result != null ? result.longValue() : HConstants.NO_SEQNUM; - } else { - return HConstants.NO_SEQNUM; + public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) { + // This method is used by tests and for figuring if we should flush or not because our + // sequenceids are old. It is also used reporting the master our oldest sequenceid for use + // figuring what edits can be skipped during log recovery. So, looking first in + // flushingOldestStoreSequenceIds is the right thing to do for the reporting oldest sequenceids + // to master. For figuring what to flush, we might get requeued if our sequence id is old + // even though we are currently flushing. This may mean we do too much flushing. Lets see. + synchronized (this.regionSequenceIdLock) { + Map m = this.flushingOldestStoreSequenceIds.get(encodedRegionName); + if (m != null) { + Long oldest = m.get(familyName); + if (oldest != null) return oldest; + } + m = this.unflushedOldestStoreSequenceIds.get(encodedRegionName); + if (m != null) { + Long oldest = m.get(familyName); + if (oldest != null) return oldest; + } } + return HConstants.NO_SEQNUM; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 5a2b08d..79a1b25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -141,30 +141,36 @@ public interface WAL { /** * WAL keeps track of the sequence numbers that were not yet flushed from memstores - * in order to be able to do cleanup. This method tells WAL that some region is about - * to flush memstore. + * in order to be able to do accounting to figure which WALs can be let go. This method tells WAL + * that some region is about to flush memstore. * *

We stash the oldest seqNum for the region, and let the the next edit inserted in this * region be recorded in {@link #append(HTableDescriptor, HRegionInfo, WALKey, WALEdit, * AtomicLong, boolean, List)} as new oldest seqnum. - * In case of flush being aborted, we put the stashed value back; in case of flush succeeding, + * + *

In case of flush being aborted, we put the stashed value back; in case of flush succeeding, * the seqNum of that first edit after start becomes the valid oldest seqNum for this region. + * + *

Currently, it expects that the update lock is held for the region; it will not work if + * appends are going on concurrently. See implementation for detail. * - * @return true if the flush can proceed, false in case wal is closing (ususally, when server is + * @return true if the flush can proceed, false in case wal is closing (usually, when server is * closing) and flush couldn't be started. + * @see #completeCacheFlush(byte[]) */ boolean startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames); /** * Complete the cache flush. * @param encodedRegionName Encoded region name. + * @see #startCacheFlush(byte[], Set) */ void completeCacheFlush(final byte[] encodedRegionName); /** * Abort a cache flush. Call if the flush fails. Note that the only recovery * for an aborted flush currently is a restart of the regionserver so the - * snapshot content dropped by the failure gets restored to the memstore.v + * snapshot content dropped by the failure gets restored to the memstore. * @param encodedRegionName Encoded region name. */ void abortCacheFlush(byte[] encodedRegionName); @@ -175,10 +181,10 @@ public interface WAL { WALCoprocessorHost getCoprocessorHost(); - /** Gets the earliest sequence number in the memstore for this particular region. - * This can serve as best-effort "recent" WAL number for this region. + /** + * Gets the earliest sequence number in the memstore for this particular region. * @param encodedRegionName The region to get the number for. - * @return The number if present, HConstants.NO_SEQNUM if absent. + * @return The sequence number if present, HConstants.NO_SEQNUM if absent. */ long getEarliestMemstoreSeqNum(byte[] encodedRegionName); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index c55280b..f8104d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -124,6 +124,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.protobuf.ServiceException; +import com.google.protobuf.TextFormat; /** * This class is responsible for splitting up a bunch of regionserver commit log @@ -324,15 +325,19 @@ public class WALSplitter { failedServerName = (serverName == null) ? "" : serverName.getServerName(); while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) { byte[] region = entry.getKey().getEncodedRegionName(); - String key = Bytes.toString(region); - lastFlushedSequenceId = lastFlushedSequenceIds.get(key); + String encodedRegionNameAsStr = Bytes.toString(region); + lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); if (lastFlushedSequenceId == null) { if (this.distributedLogReplay) { RegionStoreSequenceIds ids = csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName, - key); + encodedRegionNameAsStr); if (ids != null) { lastFlushedSequenceId = ids.getLastFlushedSequenceId(); + if (LOG.isDebugEnabled()) { + LOG.debug("DLR Last flushed sequenceid for " + encodedRegionNameAsStr + ": " + + TextFormat.shortDebugString(ids)); + } } } else if (sequenceIdChecker != null) { RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region); @@ -341,13 +346,17 @@ public class WALSplitter { maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(), storeSeqId.getSequenceId()); } - regionMaxSeqIdInStores.put(key, maxSeqIdInStores); + regionMaxSeqIdInStores.put(encodedRegionNameAsStr, maxSeqIdInStores); lastFlushedSequenceId = ids.getLastFlushedSequenceId(); + if (LOG.isDebugEnabled()) { + LOG.debug("DLS Last flushed sequenceid for " + encodedRegionNameAsStr + ": " + + TextFormat.shortDebugString(ids)); + } } if (lastFlushedSequenceId == null) { lastFlushedSequenceId = -1L; } - lastFlushedSequenceIds.put(key, lastFlushedSequenceId); + lastFlushedSequenceIds.put(encodedRegionNameAsStr, lastFlushedSequenceId); } if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) { editsSkipped++; @@ -1071,7 +1080,7 @@ public class WALSplitter { } private void doRun() throws IOException { - LOG.debug("Writer thread " + this + ": starting"); + if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting"); while (true) { RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); if (buffer == null) { @@ -1226,7 +1235,8 @@ public class WALSplitter { } } controller.checkForErrors(); - LOG.info("Split writers finished"); + LOG.info((this.writerThreads == null? 0: this.writerThreads.size()) + + " split writers finished; closing..."); return (!progress_failed); } @@ -1317,12 +1327,14 @@ public class WALSplitter { CompletionService completionService = new ExecutorCompletionService(closeThreadPool); for (final Map.Entry writersEntry : writers.entrySet()) { - LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p); + if (LOG.isTraceEnabled()) { + LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p); + } completionService.submit(new Callable() { @Override public Void call() throws Exception { WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); - LOG.debug("Closing " + wap.p); + if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p); try { wap.w.close(); } catch (IOException ioe) { @@ -1330,8 +1342,8 @@ public class WALSplitter { thrown.add(ioe); return null; } - LOG.info("Closed wap " + wap.p + " (wrote " + wap.editsWritten + " edits in " - + (wap.nanosSpent / 1000 / 1000) + "ms)"); + LOG.info("Closed " + wap.p + "; wrote " + wap.editsWritten + " edit(s) in " + + (wap.nanosSpent / 1000 / 1000) + "ms"); if (wap.editsWritten == 0) { // just remove the empty recovered.edits file @@ -1490,8 +1502,8 @@ public class WALSplitter { } } Writer w = createWriter(regionedits); - LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region)); - return (new WriterAndPath(regionedits, w)); + LOG.debug("Creating writer path=" + regionedits); + return new WriterAndPath(regionedits, w); } private void filterCellByStore(Entry logEntry) { @@ -1544,9 +1556,9 @@ public class WALSplitter { filterCellByStore(logEntry); if (!logEntry.getEdit().isEmpty()) { wap.w.append(logEntry); + this.updateRegionMaximumEditLogSeqNum(logEntry); + editsCount++; } - this.updateRegionMaximumEditLogSeqNum(logEntry); - editsCount++; } // Pass along summary statistics wap.incrementEdits(editsCount);