commit 3cabb4c8259a152c999327b73345fdd43455c0c8 Author: stack Date: Tue Feb 2 13:43:04 2016 -0800 HBASE-15158 Change order in which we do write pipeline operations; do all under row locks! M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java White space M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Removed timeout for latch that has break waiting on sequenceid assignment. In flush, add doc and formatting. Be sure to complete mvcc on exit. Formatting is in part because complaint that method is too long so broke out stuff like fat loggint to own methods. Undid an extraneous mvcc transaction that wrapped flush. This changed indent. Also moved call to sync out to WALUtil call. Added logFatLineOnFlush and doAbortFlushWal, and doSyncOfUnflushedWALChange methods to make the flush method smaller. Changed getNextSequenceId so it no longer adds fake edit to WAL only to have it discarded.. Since sequenceid font is inside mvcc, just get the next. Renamed BatchOperationInProgress to BatchOperation and renamed doMiniBatchMutation as doMiniBatchMutate so it goes with its caller, batchMutate. Some refactor of doMiniBatchMutate to make slightly smaller adding doc, breaking out methods, and removing redundancies but not enough. Needs rewrite. Did the new common treatment of append/sync/mvcc careful on the end to call complete on mvcc before skipping out. This pattern is done in all wal writing in this patch. Also includes the flip in order where we do mvcc, append, sync, update memstore, stop mvcc instead of what we had previous which was mvcc, append, memstore, sync, mvcc. Redo increment and append. They were 95% the same thing. Both now call doDelta. Renamed doGet as get. Removed main and a few DLR methods no longer used. M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java Less dancing. M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java Use methods because I want to clean up all the variants of sequenceid in here. M hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java Cleanup and do complete append, sync and mvcc in here rather than in caller. That is ok because methods in here are adding to WAL only, not to mem. M hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java Doc. Cleanup. M hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Test changes with above changes. Used to expect 'true' though we were asked to skip the WAL because our trick of doing fake empty append was tripping the coprocessor append flag. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index 2984754..f61d871 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -457,7 +457,6 @@ public class DefaultMemStore implements MemStore { * This is called under row lock, so Get operations will still see updates * atomically. Scans will only see each KeyValue update as atomic. * - * @param cells * @param readpoint readpoint below which we can safely remove duplicate KVs * @return change in memstore size */ @@ -578,7 +577,7 @@ public class DefaultMemStore implements MemStore { // last iterated Cells for cellSet and snapshot (to restore iterator state after reseek) private Cell cellSetItRow = null; private Cell snapshotItRow = null; - + // iterator based scanning. private Iterator cellSetIt; private Iterator snapshotIt; @@ -593,7 +592,7 @@ public class DefaultMemStore implements MemStore { // The allocator and snapshot allocator at the time of creating this scanner volatile MemStoreLAB allocatorAtCreation; volatile MemStoreLAB snapshotAllocatorAtCreation; - + // A flag represents whether could stop skipping Cells for MVCC // if have encountered the next row. Only used for reversed scan private boolean stopSkippingCellsIfNextRow = false; @@ -806,7 +805,7 @@ public class DefaultMemStore implements MemStore { this.cellSetIt = null; this.snapshotIt = null; - + if (allocatorAtCreation != null) { this.allocatorAtCreation.decScannerCount(); this.allocatorAtCreation = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3cf4122..e7f34d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -29,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -41,6 +41,7 @@ import java.util.NavigableSet; import java.util.RandomAccess; import java.util.Set; import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; @@ -69,7 +70,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellScanner; @@ -77,7 +77,6 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; @@ -93,7 +92,6 @@ import org.apache.hadoop.hbase.ShareableMemory; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagRewriteCell; -import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.backup.HFileArchiver; @@ -112,7 +110,7 @@ import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; @@ -123,8 +121,6 @@ import org.apache.hadoop.hbase.filter.FilterWrapper; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.io.hfile.BlockCache; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.ipc.CallerDisconnectedException; import org.apache.hadoop.hbase.ipc.RpcCallContext; @@ -148,6 +144,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; @@ -169,7 +166,6 @@ import org.apache.hadoop.hbase.util.CompressionTest; import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.EncryptionTest; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Pair; @@ -199,6 +195,7 @@ import com.google.protobuf.RpcController; import com.google.protobuf.Service; import com.google.protobuf.TextFormat; +@SuppressWarnings("deprecation") @InterfaceAudience.Private public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region { private static final Log LOG = LogFactory.getLog(HRegion.class); @@ -207,18 +204,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi "hbase.hregion.scan.loadColumnFamiliesOnDemand"; /** - * Longest time we'll wait on a sequenceid. - * Sequenceid comes up out of the WAL subsystem. WAL subsystem can go bad or a test might use - * it without cleaning up previous usage properly; generally, a WAL roll is needed. The timeout - * is for a latch in WALKey. There is no global accounting of outstanding WALKeys; intentionally - * to avoid contention, but it makes it so if an abort or problem, we could be stuck waiting - * on the WALKey latch. Revisit. - */ - private final int maxWaitForSeqId; - private static final String MAX_WAIT_FOR_SEQ_ID_KEY = "hbase.hregion.max.wait.for.sequenceid.ms"; - private static final int DEFAULT_MAX_WAIT_FOR_SEQ_ID = 30000; - - /** * This is the global default value for durability. All tables/mutations not * defining a durability or using USE_DEFAULT will default to this value. */ @@ -282,7 +267,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi final Counter checkAndMutateChecksPassed = new Counter(); final Counter checkAndMutateChecksFailed = new Counter(); - //Number of requests + // Number of requests final Counter readRequestsCount = new Counter(); final Counter writeRequestsCount = new Counter(); @@ -356,7 +341,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ private boolean disallowWritesInRecovering = false; - // when a region is in recovering state, it can only accept writes not reads + // When a region is in recovering state, it can only accept writes not reads private volatile boolean recovering = false; private volatile Optional configurationManager; @@ -373,7 +358,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // We achieve this by synchronizing on the scannerReadPoints object. synchronized(scannerReadPoints) { minimumReadPoint = mvcc.getReadPoint(); - for (Long readPoint: this.scannerReadPoints.values()) { if (readPoint < minimumReadPoint) { minimumReadPoint = readPoint; @@ -673,7 +657,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); - this.maxWaitForSeqId = conf.getInt(MAX_WAIT_FOR_SEQ_ID_KEY, DEFAULT_MAX_WAIT_FOR_SEQ_ID); this.isLoadingCfsOnDemandDefault = conf.getBoolean(LOAD_CFS_ON_DEMAND_CONFIG_KEY, true); this.htableDescriptor = htd; this.rsServices = rsServices; @@ -1177,7 +1160,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ public void setRecovering(boolean newState) { boolean wasRecovering = this.recovering; - // before we flip the recovering switch (enabling reads) we should write the region open + // Before we flip the recovering switch (enabling reads) we should write the region open // event to WAL if needed if (wal != null && getRegionServerServices() != null && !writestate.readOnly && wasRecovering && !newState) { @@ -2045,7 +2028,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * Should the store be flushed because it is old enough. *

- * Every FlushPolicy should call this to determine whether a store is old enough to flush(except + * Every FlushPolicy should call this to determine whether a store is old enough to flush (except * that you always flush all stores). Otherwise the {@link #shouldFlush()} method will always * returns true which will make a lot of flush requests. */ @@ -2146,19 +2129,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * for say installing a bulk loaded file just ahead of the last hfile that was * the result of this flush, etc. * - * @param wal - * Null if we're NOT to go via wal. - * @param myseqid - * The seqid to use if wal is null writing out flush - * file. - * @param storesToFlush - * The list of stores to flush. + * @param wal Null if we're NOT to go via wal. + * @param myseqid The seqid to use if wal is null writing out flush file. + * @param storesToFlush The list of stores to flush. * @return object describing the flush's state - * @throws IOException - * general io exceptions - * @throws DroppedSnapshotException - * Thrown when replay of wal is required because a Snapshot was not - * properly persisted. + * @throws IOException general io exceptions + * @throws DroppedSnapshotException Thrown when replay of WAL is required. */ protected FlushResult internalFlushcache(final WAL wal, final long myseqid, final Collection storesToFlush, MonitoredTask status, boolean writeFlushWalMarker) @@ -2182,65 +2158,48 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throw new IOException("Aborting flush because server is aborted..."); } final long startTime = EnvironmentEdgeManager.currentTime(); - // If nothing to flush, return, but we need to safely update the region sequence id + // If nothing to flush, return, but return with a valid unused sequenceId. + // Its needed by bulk upload IIRC. It flushes until no edits in memory so it can insert a + // bulk loaded file between memory and existing hfiles. It wants a good seqeunceId that belongs + // to no other that it can use to associate with the bulk load. Hence this little dance below + // to go get one. if (this.memstoreSize.get() <= 0) { - // Take an update lock because am about to change the sequence id and we want the sequence id - // to be at the border of the empty memstore. - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; + // Take an update lock so no edits can come into memory just yet. this.updatesLock.writeLock().lock(); + WriteEntry writeEntry = null; try { if (this.memstoreSize.get() <= 0) { // Presume that if there are still no edits in the memstore, then there are no edits for // this region out in the WAL subsystem so no need to do any trickery clearing out - // edits in the WAL system. Up the sequence number so the resulting flush id is for - // sure just beyond the last appended region edit (useful as a marker when bulk loading, - // etc.). NOTE: The writeEntry write number is NOT in the WAL.. there is no WAL writing - // here. + // edits in the WAL sub-system. Up the sequence number so the resulting flush id is for + // sure just beyond the last appended region edit and not associated with any edit + // (useful as marker when bulk loading, etc.). + FlushResult flushResult = null; if (wal != null) { writeEntry = mvcc.begin(); long flushOpSeqId = writeEntry.getWriteNumber(); - FlushResult flushResult = new FlushResultImpl( - FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, - flushOpSeqId, - "Nothing to flush", - writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker)); - // TODO: Lets see if we hang here, if there is a scenario where an outstanding reader - // with a read point is in advance of this write point. + flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, + flushOpSeqId, "Nothing to flush", + writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker)); mvcc.completeAndWait(writeEntry); + // Set to null so we don't complete it again down in finally block. writeEntry = null; return new PrepareFlushResult(flushResult, myseqid); } else { - return new PrepareFlushResult( - new FlushResultImpl( - FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, - "Nothing to flush", - false), - myseqid); + return new PrepareFlushResult(new FlushResultImpl( + FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush", false), myseqid); } } } finally { - this.updatesLock.writeLock().unlock(); if (writeEntry != null) { + // If writeEntry is non-null, this operation failed; the mvcc transaction failed... + // but complete it anyways so it doesn't block the mvcc queue. mvcc.complete(writeEntry); } + this.updatesLock.writeLock().unlock(); } } - - if (LOG.isInfoEnabled()) { - // Log a fat line detailing what is being flushed. - StringBuilder perCfExtras = null; - if (!isAllFamilies(storesToFlush)) { - perCfExtras = new StringBuilder(); - for (Store store: storesToFlush) { - perCfExtras.append("; ").append(store.getColumnFamilyName()); - perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize())); - } - } - LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + - " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) + - ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") + - ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + myseqid)); - } + logFatLineOnFlush(storesToFlush, myseqid); // Stop updates while we snapshot the memstore of all of these regions' stores. We only have // to do this for a moment. It is quick. We also set the memstore size to zero here before we // allow updates again so its value will represent the size of the updates received @@ -2251,8 +2210,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.setStatus("Obtaining lock to block concurrent updates"); // block waiting for the lock for internal flush this.updatesLock.writeLock().lock(); - status.setStatus("Preparing to flush by snapshotting stores in " + - getRegionInfo().getEncodedName()); + status.setStatus("Preparing flush snapshotting stores in " + getRegionInfo().getEncodedName()); long totalFlushableSizeOfFlushableStores = 0; Set flushedFamilyNames = new HashSet(); @@ -2274,109 +2232,117 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // will be in advance of this sequence id. long flushedSeqId = HConstants.NO_SEQNUM; byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes(); - - long trxId = 0; - MultiVersionConcurrencyControl.WriteEntry writeEntry = mvcc.begin(); try { - try { - if (wal != null) { - Long earliestUnflushedSequenceIdForTheRegion = + if (wal != null) { + Long earliestUnflushedSequenceIdForTheRegion = wal.startCacheFlush(encodedRegionName, flushedFamilyNames); - if (earliestUnflushedSequenceIdForTheRegion == null) { - // This should never happen. This is how startCacheFlush signals flush cannot proceed. - String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing."; - status.setStatus(msg); - return new PrepareFlushResult( + if (earliestUnflushedSequenceIdForTheRegion == null) { + // This should never happen. This is how startCacheFlush signals flush cannot proceed. + String msg = this.getRegionInfo().getEncodedName() + " flush aborted; WAL closing."; + status.setStatus(msg); + return new PrepareFlushResult( new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false), myseqid); - } - flushOpSeqId = getNextSequenceId(wal); - // Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit - flushedSeqId = - earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM? - flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1; - } else { - // use the provided sequence Id as WAL is not being used for this flush. - flushedSeqId = flushOpSeqId = myseqid; } + flushOpSeqId = getNextSequenceId(wal); + // Back up 1, minus 1 from oldest sequence id in memstore to get last 'flushed' edit + flushedSeqId = + earliestUnflushedSequenceIdForTheRegion.longValue() == HConstants.NO_SEQNUM? + flushOpSeqId: earliestUnflushedSequenceIdForTheRegion.longValue() - 1; + } else { + // use the provided sequence Id as WAL is not being used for this flush. + flushedSeqId = flushOpSeqId = myseqid; + } - for (Store s : storesToFlush) { - totalFlushableSizeOfFlushableStores += s.getFlushableSize(); - storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId)); - committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL - storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize()); - } + for (Store s : storesToFlush) { + totalFlushableSizeOfFlushableStores += s.getFlushableSize(); + storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId)); + committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL + storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize()); + } - // write the snapshot start to WAL - if (wal != null && !writestate.readOnly) { - FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, + // write the snapshot start to WAL + if (wal != null && !writestate.readOnly) { + FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); - // no sync. Sync is below where we do not hold the updates lock - trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, false, mvcc); - } - - // Prepare flush (take a snapshot) - for (StoreFlushContext flush : storeFlushCtxs.values()) { - flush.prepare(); - } - } catch (IOException ex) { - if (wal != null) { - if (trxId > 0) { // check whether we have already written START_FLUSH to WAL - try { - FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, - getRegionInfo(), flushOpSeqId, committedFiles); - WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, false, mvcc); - } catch (Throwable t) { - LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + - StringUtils.stringifyException(t)); - // ignore this since we will be aborting the RS with DSE. - } - } - // we have called wal.startCacheFlush(), now we have to abort it - wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); - throw ex; // let upper layers deal with it. - } - } finally { - this.updatesLock.writeLock().unlock(); - } - String s = "Finished memstore snapshotting " + this + - ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores; - status.setStatus(s); - if (LOG.isTraceEnabled()) LOG.trace(s); - // sync unflushed WAL changes - // see HBASE-8208 for details - if (wal != null) { - try { - wal.sync(); // ensure that flush marker is sync'ed - } catch (IOException ioe) { - wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); - throw ioe; - } + // No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH + WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc); } - // wait for all in-progress transactions to commit to WAL before - // we can start the flush. This prevents - // uncommitted transactions from being written into HFiles. - // We have to block before we start the flush, otherwise keys that - // were removed via a rollbackMemstore could be written to Hfiles. - mvcc.completeAndWait(writeEntry); - // set writeEntry to null to prevent mvcc.complete from being called again inside finally - // block - writeEntry = null; - } finally { - if (writeEntry != null) { - // In case of failure just mark current writeEntry as complete. - mvcc.complete(writeEntry); + // Prepare flush (take a snapshot) + for (StoreFlushContext flush : storeFlushCtxs.values()) { + flush.prepare(); } + } catch (IOException ex) { + doAbortFlushToWAL(wal, flushOpSeqId, committedFiles); + throw ex; + } finally { + this.updatesLock.writeLock().unlock(); } + String s = "Finished memstore snapshotting " + this + ", syncing WAL and waiting on mvcc, " + + "flushsize=" + totalFlushableSizeOfFlushableStores; + status.setStatus(s); + doSyncOfUnflushedWALChanges(wal, getRegionInfo()); return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushOpSeqId, flushedSeqId, totalFlushableSizeOfFlushableStores); } /** - * @param families + * Utility method broken out of internalPrepareFlushCache so that method is smaller. + */ + private void logFatLineOnFlush(final Collection storesToFlush, final long sequenceId) { + if (!LOG.isInfoEnabled()) { + return; + } + // Log a fat line detailing what is being flushed. + StringBuilder perCfExtras = null; + if (!isAllFamilies(storesToFlush)) { + perCfExtras = new StringBuilder(); + for (Store store: storesToFlush) { + perCfExtras.append("; ").append(store.getColumnFamilyName()); + perCfExtras.append("=").append(StringUtils.byteDesc(store.getMemStoreSize())); + } + } + LOG.info("Flushing " + + storesToFlush.size() + "/" + stores.size() + + " column families, memstore=" + StringUtils.byteDesc(this.memstoreSize.get()) + + ((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") + + ((wal != null) ? "" : "; WAL is null, using passed sequenceid=" + sequenceId)); + } + + private void doAbortFlushToWAL(final WAL wal, final long flushOpSeqId, + final Map> committedFiles) { + if (wal == null) return; + try { + FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, + getRegionInfo(), flushOpSeqId, committedFiles); + WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, + mvcc); + } catch (Throwable t) { + LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL:" + + StringUtils.stringifyException(t)); + // ignore this since we will be aborting the RS with DSE. + } + // we have called wal.startCacheFlush(), now we have to abort it + wal.abortCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); + } + + /** + * Sync unflushed WAL changes. See HBASE-8208 for details + */ + private static void doSyncOfUnflushedWALChanges(final WAL wal, final HRegionInfo hri) + throws IOException { + if (wal == null) { + return; + } + try { + wal.sync(); // ensure that flush marker is sync'ed + } catch (IOException ioe) { + wal.abortCacheFlush(hri.getEncodedNameAsBytes()); + throw ioe; + } + } + + /** * @return True if passed Set is all families in the region. */ private boolean isAllFamilies(final Collection families) { @@ -2394,8 +2360,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH, getRegionInfo(), -1, new TreeMap>(Bytes.BYTES_COMPARATOR)); try { - WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, true, mvcc); + WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc); return true; } catch (IOException e) { LOG.warn(getRegionInfo().getEncodedName() + " : " @@ -2465,8 +2430,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // write flush marker to WAL. If fail, we should throw DroppedSnapshotException FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); - WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, true, mvcc); + WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, true, mvcc); } } catch (Throwable t) { // An exception here means that the snapshot was not persisted. @@ -2479,8 +2443,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH, getRegionInfo(), flushOpSeqId, committedFiles); - WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, false, mvcc); + WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), desc, false, mvcc); } catch (Throwable ex) { LOG.warn(getRegionInfo().getEncodedName() + " : " + "failed writing ABORT_FLUSH marker to WAL", ex); @@ -2551,15 +2514,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ @VisibleForTesting protected long getNextSequenceId(final WAL wal) throws IOException { - // TODO: For review. Putting an empty edit in to get a sequenceid out will not work if the - // WAL is banjaxed... if it has gotten an exception and the WAL has not yet been rolled or - // aborted. In this case, we'll just get stuck here. For now, until HBASE-12751, just have - // a timeout. May happen in tests after we tightened the semantic via HBASE-14317. - // Also, the getSequenceId blocks on a latch. There is no global list of outstanding latches - // so if an abort or stop, there is no way to call them in. - WALKey key = this.appendEmptyEdit(wal); - mvcc.complete(key.getWriteEntry()); - return key.getSequenceId(this.maxWaitForSeqId); + WriteEntry we = mvcc.begin(); + mvcc.completeAndWait(we); + return we.getWriteNumber(); } ////////////////////////////////////////////////////////////////////////////// @@ -2748,13 +2705,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * accumulating status codes and tracking the index at which processing * is proceeding. */ - private abstract static class BatchOperationInProgress { + private abstract static class BatchOperation { T[] operations; int nextIndexToProcess = 0; OperationStatus[] retCodeDetails; WALEdit[] walEditsFromCoprocessors; - public BatchOperationInProgress(T[] operations) { + public BatchOperation(T[] operations) { this.operations = operations; this.retCodeDetails = new OperationStatus[operations.length]; this.walEditsFromCoprocessors = new WALEdit[operations.length]; @@ -2774,7 +2731,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private static class MutationBatch extends BatchOperationInProgress { + private static class MutationBatch extends BatchOperation { private long nonceGroup; private long nonce; public MutationBatch(Mutation[] operations, long nonceGroup, long nonce) { @@ -2814,7 +2771,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private static class ReplayBatch extends BatchOperationInProgress { + private static class ReplayBatch extends BatchOperation { private long replaySeqId = 0; public ReplayBatch(MutationReplay[] operations, long seqId) { super(operations); @@ -2900,7 +2857,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * OperationStatusCode and the exceptionMessage if any. * @throws IOException */ - OperationStatus[] batchMutate(BatchOperationInProgress batchOp) throws IOException { + OperationStatus[] batchMutate(BatchOperation batchOp) throws IOException { boolean initialized = false; Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE; startRegionOperation(op); @@ -2914,11 +2871,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (!initialized) { this.writeRequestsCount.add(batchOp.operations.length); if (!batchOp.isInReplay()) { - doPreMutationHook(batchOp); + doPreBatchMutateHook(batchOp); } initialized = true; } - long addedSize = doMiniBatchMutation(batchOp); + long addedSize = doMiniBatchMutate(batchOp); long newSize = this.addAndGetGlobalMemstoreSize(addedSize); if (isFlushSize(newSize)) { requestFlush(); @@ -2930,8 +2887,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return batchOp.retCodeDetails; } - - private void doPreMutationHook(BatchOperationInProgress batchOp) + private void doPreBatchMutateHook(BatchOperation batchOp) throws IOException { /* Run coprocessor pre hook outside of locks to avoid deadlock */ WALEdit walEdit = new WALEdit(); @@ -2970,103 +2926,58 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + /** + * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)} + * In here we also handle replay of edits on region recover. + * @return Change in size brought about by applying batchOp + */ @SuppressWarnings("unchecked") - private long doMiniBatchMutation(BatchOperationInProgress batchOp) throws IOException { - boolean isInReplay = batchOp.isInReplay(); - // variable to note if all Put items are for the same CF -- metrics related + // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120 + private long doMiniBatchMutate(BatchOperation batchOp) throws IOException { + boolean replay = batchOp.isInReplay(); + // Variable to note if all Put items are for the same CF -- metrics related boolean putsCfSetConsistent = true; - //The set of columnFamilies first seen for Put. - Set putsCfSet = null; - // variable to note if all Delete items are for the same CF -- metrics related + // Variable to note if all Delete items are for the same CF -- metrics related boolean deletesCfSetConsistent = true; - //The set of columnFamilies first seen for Delete. + // The set of columnFamilies first seen for Put. + Set putsCfSet = null; + // The set of columnFamilies first seen for Delete. Set deletesCfSet = null; - - long currentNonceGroup = HConstants.NO_NONCE, currentNonce = HConstants.NO_NONCE; - WALEdit walEdit = new WALEdit(isInReplay); - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; - long txid = 0; - boolean doRollBackMemstore = false; + long currentNonceGroup = HConstants.NO_NONCE; + long currentNonce = HConstants.NO_NONCE; + WALEdit walEdit = new WALEdit(replay); boolean locked = false; - - /** Keep track of the locks we hold so we can release them in finally clause */ - List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); // reference family maps directly so coprocessors can mutate them if desired Map>[] familyMaps = new Map[batchOp.operations.length]; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; boolean success = false; - int noOfPuts = 0, noOfDeletes = 0; - WALKey walKey = null; - long mvccNum = 0; + int noOfPuts = 0; + int noOfDeletes = 0; + WriteEntry writeEntry = null; + /** Keep track of the locks we hold so we can release them in finally clause */ + List acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); try { - // ------------------------------------ - // STEP 1. Try to acquire as many locks as we can, and ensure - // we acquire at least one. - // ---------------------------------- + // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one. int numReadyToWrite = 0; long now = EnvironmentEdgeManager.currentTime(); while (lastIndexExclusive < batchOp.operations.length) { - Mutation mutation = batchOp.getMutation(lastIndexExclusive); - boolean isPutMutation = mutation instanceof Put; - - Map> familyMap = mutation.getFamilyCellMap(); - // store the family map reference to allow for mutations - familyMaps[lastIndexExclusive] = familyMap; - - // skip anything that "ran" already - if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - lastIndexExclusive++; - continue; - } - - try { - if (isPutMutation) { - // Check the families in the put. If bad, skip this one. - if (isInReplay) { - removeNonExistentColumnFamilyForReplay(familyMap); - } else { - checkFamilies(familyMap.keySet()); - } - checkTimestamps(mutation.getFamilyCellMap(), now); - } else { - prepareDelete((Delete) mutation); - } - checkRow(mutation.getRow(), "doMiniBatchMutation"); - } catch (NoSuchColumnFamilyException nscf) { - LOG.warn("No such column family in batch mutation", nscf); - batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( - OperationStatusCode.BAD_FAMILY, nscf.getMessage()); - lastIndexExclusive++; - continue; - } catch (FailedSanityCheckException fsce) { - LOG.warn("Batch Mutation did not pass sanity check", fsce); - batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( - OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); - lastIndexExclusive++; - continue; - } catch (WrongRegionException we) { - LOG.warn("Batch mutation had a row that does not belong to this region", we); - batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( - OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); + if (checkBatchOp(batchOp, lastIndexExclusive, familyMaps, now)) { lastIndexExclusive++; continue; } - - // If we haven't got any rows in our batch, we should block to - // get the next one. + Mutation mutation = batchOp.getMutation(lastIndexExclusive); + // If we haven't got any rows in our batch, we should block to get the next one. RowLock rowLock = null; try { rowLock = getRowLock(mutation.getRow(), true); } catch (IOException ioe) { - LOG.warn("Failed getting lock in batch put, row=" - + Bytes.toStringBinary(mutation.getRow()), ioe); + LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe); } if (rowLock == null) { // We failed to grab another lock - break; // stop acquiring more rows for this batch + break; // Stop acquiring more rows for this batch } else { acquiredRowLocks.add(rowLock); } @@ -3074,9 +2985,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi lastIndexExclusive++; numReadyToWrite++; - if (isPutMutation) { + if (mutation instanceof Put) { // If Column Families stay consistent through out all of the - // individual puts then metrics can be reported as a mutliput across + // individual puts then metrics can be reported as a multiput across // column families in the first put. if (putsCfSet == null) { putsCfSet = mutation.getFamilyCellMap().keySet(); @@ -3094,23 +3005,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - // we should record the timestamp only after we have acquired the rowLock, + // We've now grabbed as many mutations off the list as we can + + // STEP 2. Update any LATEST_TIMESTAMP timestamps + // We should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp now = EnvironmentEdgeManager.currentTime(); byte[] byteNow = Bytes.toBytes(now); // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily? - if (numReadyToWrite <= 0) return 0L; - - // We've now grabbed as many mutations off the list as we can + if (numReadyToWrite <= 0) { + return 0L; + } - // ------------------------------------ - // STEP 2. Update any LATEST_TIMESTAMP timestamps - // ---------------------------------- - for (int i = firstIndex; !isInReplay && i < lastIndexExclusive; i++) { + for (int i = firstIndex; !replay && i < lastIndexExclusive; i++) { // skip invalid if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) continue; + != OperationStatusCode.NOT_RUN) { + // lastIndexExclusive was incremented above. + continue; + } Mutation mutation = batchOp.getMutation(i); if (mutation instanceof Put) { @@ -3127,16 +3041,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi locked = true; // calling the pre CP hook for batch mutation - if (!isInReplay && coprocessorHost != null) { + if (!replay && coprocessorHost != null) { MiniBatchOperationInProgress miniBatchOp = new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); - if (coprocessorHost.preBatchMutate(miniBatchOp)) return 0L; + if (coprocessorHost.preBatchMutate(miniBatchOp)) { + return 0L; + } } - // ------------------------------------ // STEP 3. Build WAL edit - // ---------------------------------- Durability durability = Durability.USE_DEFAULT; for (int i = firstIndex; i < lastIndexExclusive; i++) { // Skip puts that were determined to be invalid during preprocessing @@ -3154,26 +3068,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi continue; } - long nonceGroup = batchOp.getNonceGroup(i), nonce = batchOp.getNonce(i); + long nonceGroup = batchOp.getNonceGroup(i); + long nonce = batchOp.getNonce(i); // In replay, the batch may contain multiple nonces. If so, write WALEdit for each. // Given how nonces are originally written, these should be contiguous. // They don't have to be, it will still work, just write more WALEdits than needed. if (nonceGroup != currentNonceGroup || nonce != currentNonce) { - if (walEdit.size() > 0) { - assert isInReplay; - if (!isInReplay) { - throw new IOException("Multiple nonces per batch and not in replay"); - } - // txid should always increase, so having the one from the last call is ok. - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), now, m.getClusterIds(), - currentNonceGroup, currentNonce, mvcc); - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, - walEdit, true); - walEdit = new WALEdit(isInReplay); - walKey = null; - } + // Write what we have so far for nonces out to WAL + appendCurrentNonces(m, replay, walEdit, now, currentNonceGroup, currentNonce); + walEdit = new WALEdit(replay); currentNonceGroup = nonceGroup; currentNonce = nonce; } @@ -3188,107 +3091,83 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi addFamilyMapToWALEdit(familyMaps[i], walEdit); } - // ------------------------- - // STEP 4. Append the final edit to WAL. Do not sync wal. - // ------------------------- + // STEP 4. Append the final edit to WAL and sync. Mutation mutation = batchOp.getMutation(firstIndex); - if (isInReplay) { + WALKey walKey = null; + if (replay) { // use wal key from the original walKey = new ReplayHLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); - long replaySeqId = batchOp.getReplaySequenceId(); - walKey.setOrigLogSeqNum(replaySeqId); - } - if (walEdit.size() > 0) { - if (!isInReplay) { - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + walKey.setOrigLogSeqNum(batchOp.getReplaySequenceId()); + } + // Not sure what is going on here when replay is going on... does the below append get + // called for replayed edits? Am afraid to change it without test. + if (!walEdit.isEmpty()) { + if (!replay) { + // we use HLogKey here instead of WALKey directly to support legacy coprocessors. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc); } - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); + // TODO: Use the doAppend methods below... complicated by the replay stuff above. + try { + long txid = + this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); + if (txid != 0) sync(txid, durability); + writeEntry = walKey.getWriteEntry(); + } catch (IOException ioe) { + if (walKey != null) mvcc.complete(walKey.getWriteEntry()); + throw ioe; + } } - // ------------------------------------ - // Acquire the latest mvcc number - // ---------------------------------- if (walKey == null) { - // If this is a skip wal operation just get the read point from mvcc - walKey = this.appendEmptyEdit(this.wal); - } - if (!isInReplay) { - writeEntry = walKey.getWriteEntry(); - mvccNum = writeEntry.getWriteNumber(); - } else { - mvccNum = batchOp.getReplaySequenceId(); + // If no walKey, then skipping WAL or some such. Being an mvcc transaction so sequenceid. + writeEntry = mvcc.begin(); } - // ------------------------------------ // STEP 5. Write back to memstore - // Write to memstore. It is ok to write to memstore - // first without syncing the WAL because we do not roll - // forward the memstore MVCC. The MVCC will be moved up when - // the complete operation is done. These changes are not yet - // visible to scanners till we update the MVCC. The MVCC is - // moved only when the sync is complete. - // ---------------------------------- long addedSize = 0; for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { continue; } - doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote - addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, isInReplay); + addedSize += applyFamilyMapToMemstore(familyMaps[i], replay, + replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber()); + } + + // STEP 6. Complete mvcc. + if (replay) { + this.mvcc.advanceTo(batchOp.getReplaySequenceId()); + } else if (writeEntry != null/*Can be null if in replay mode*/) { + mvcc.completeAndWait(writeEntry); + writeEntry = null; } - // ------------------------------- - // STEP 6. Release row locks, etc. - // ------------------------------- + // STEP 7. Release row locks, etc. if (locked) { this.updatesLock.readLock().unlock(); locked = false; } releaseRowLocks(acquiredRowLocks); - // ------------------------- - // STEP 7. Sync wal. - // ------------------------- - if (txid != 0) { - syncOrDefer(txid, durability); - } - - doRollBackMemstore = false; // calling the post CP hook for batch mutation - if (!isInReplay && coprocessorHost != null) { + if (!replay && coprocessorHost != null) { MiniBatchOperationInProgress miniBatchOp = new MiniBatchOperationInProgress(batchOp.getMutationsForCoprocs(), batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive); coprocessorHost.postBatchMutate(miniBatchOp); } - // ------------------------------------------------------------------ - // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. - // ------------------------------------------------------------------ - if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); - writeEntry = null; - } else if (isInReplay) { - // ensure that the sequence id of the region is at least as big as orig log seq id - mvcc.advanceTo(mvccNum); - } - for (int i = firstIndex; i < lastIndexExclusive; i ++) { if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) { batchOp.retCodeDetails[i] = OperationStatus.SUCCESS; } } - // ------------------------------------ - // STEP 9. Run coprocessor post hooks. This should be done after the wal is + // STEP 8. Run coprocessor post hooks. This should be done after the wal is // synced so that the coprocessor contract is adhered to. - // ------------------------------------ - if (!isInReplay && coprocessorHost != null) { + if (!replay && coprocessorHost != null) { for (int i = firstIndex; i < lastIndexExclusive; i++) { // only for successful puts if (batchOp.retCodeDetails[i].getOperationStatusCode() @@ -3307,18 +3186,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi success = true; return addedSize; } finally { - // if the wal sync was unsuccessful, remove keys from memstore - if (doRollBackMemstore) { - for (int j = 0; j < familyMaps.length; j++) { - for(List cells:familyMaps[j].values()) { - rollbackMemstore(cells); - } - } - if (writeEntry != null) mvcc.complete(writeEntry); - } else if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); - } - + // Call complete rather than completeAndWait because we probably had error if walKey != null + if (writeEntry != null) mvcc.complete(writeEntry); if (locked) { this.updatesLock.readLock().unlock(); } @@ -3363,6 +3232,88 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + private void appendCurrentNonces(final Mutation mutation, final boolean replay, + final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce) + throws IOException { + if (walEdit.isEmpty()) return; + if (!replay) throw new IOException("Multiple nonces per batch and not in replay"); + WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), now, mutation.getClusterIds(), + currentNonceGroup, currentNonce, mvcc); + this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); + // Complete the mvcc transaction started down in append else it will block others + this.mvcc.complete(walKey.getWriteEntry()); + } + + private boolean checkBatchOp(BatchOperation batchOp, final int lastIndexExclusive, + final Map>[] familyMaps, final long now) + throws IOException { + boolean skip = false; + // Skip anything that "ran" already + if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode() + != OperationStatusCode.NOT_RUN) { + return true; + } + Mutation mutation = batchOp.getMutation(lastIndexExclusive); + Map> familyMap = mutation.getFamilyCellMap(); + // store the family map reference to allow for mutations + familyMaps[lastIndexExclusive] = familyMap; + + try { + if (mutation instanceof Put) { + // Check the families in the put. If bad, skip this one. + if (batchOp.isInReplay()) { + removeNonExistentColumnFamilyForReplay(familyMap); + } else { + checkFamilies(familyMap.keySet()); + } + checkTimestamps(mutation.getFamilyCellMap(), now); + } else { + prepareDelete((Delete)mutation); + } + checkRow(mutation.getRow(), "doMiniBatchMutation"); + } catch (NoSuchColumnFamilyException nscf) { + LOG.warn("No such column family in batch mutation", nscf); + batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( + OperationStatusCode.BAD_FAMILY, nscf.getMessage()); + skip = true; + } catch (FailedSanityCheckException fsce) { + LOG.warn("Batch Mutation did not pass sanity check", fsce); + batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( + OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage()); + skip = true; + } catch (WrongRegionException we) { + LOG.warn("Batch mutation had a row that does not belong to this region", we); + batchOp.retCodeDetails[lastIndexExclusive] = new OperationStatus( + OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage()); + skip = true; + } + return skip; + } + + /** + * During replay, there could exist column families which are removed between region server + * failure and replay + */ + private void removeNonExistentColumnFamilyForReplay(final Map> familyMap) { + List nonExistentList = null; + for (byte[] family : familyMap.keySet()) { + if (!this.htableDescriptor.hasFamily(family)) { + if (nonExistentList == null) { + nonExistentList = new ArrayList(); + } + nonExistentList.add(family); + } + } + if (nonExistentList != null) { + for (byte[] family : nonExistentList) { + // Perhaps schema was changed between crash and replay + LOG.info("No family for " + Bytes.toString(family) + " omit from reply."); + familyMap.remove(family); + } + } + } + /** * Returns effective durability from the passed durability and * the table descriptor. @@ -3371,93 +3322,82 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return d == Durability.USE_DEFAULT ? this.durability : d; } - //TODO, Think that gets/puts and deletes should be refactored a bit so that - //the getting of the lock happens before, so that you would just pass it into - //the methods. So in the case of checkAndMutate you could just do lockRow, - //get, put, unlockRow or something - @Override public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, - CompareOp compareOp, ByteArrayComparable comparator, Mutation w, + CompareOp compareOp, ByteArrayComparable comparator, Mutation mutation, boolean writeToWAL) throws IOException{ + checkMutationType(mutation, row); + return doCheckAndRowMutate(row, family, qualifier, compareOp, comparator, null, + mutation, writeToWAL); + } + + @Override + public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, + CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm, + boolean writeToWAL) + throws IOException { + return doCheckAndRowMutate(row, family, qualifier, compareOp, comparator, rm, null, + writeToWAL); + } + + /** + * checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has + * switches in the few places where there is deviation. + */ + private boolean doCheckAndRowMutate(byte [] row, byte [] family, byte [] qualifier, + CompareOp compareOp, ByteArrayComparable comparator, RowMutations rowMutations, + Mutation mutation, boolean writeToWAL) + throws IOException { + // Could do the below checks but seems wacky with two callers only. Just comment out for now. + // One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't + // need these commented out checks. + // if (rowMutations == null && mutation == null) throw new DoNotRetryIOException("Both null"); + // if (rowMutations != null && mutation != null) throw new DoNotRetryIOException("Both set"); checkReadOnly(); - //TODO, add check for value length or maybe even better move this to the - //client if this becomes a global setting + // TODO, add check for value length also move this check to the client checkResources(); - boolean isPut = w instanceof Put; - if (!isPut && !(w instanceof Delete)) - throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must " + - "be Put or Delete"); - if (!Bytes.equals(row, w.getRow())) { - throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's " + - "getRow must match the passed row"); - } - startRegionOperation(); try { Get get = new Get(row); checkFamily(family); get.addColumn(family, qualifier); - // Lock row - note that doBatchMutate will relock this row if called RowLock rowLock = getRowLock(get.getRow()); - // wait for all previous transactions to complete (with lock held) - mvcc.await(); try { - if (this.getCoprocessorHost() != null) { + if (mutation != null && this.getCoprocessorHost() != null) { + // Call coprocessor. Boolean processed = null; - if (w instanceof Put) { + if (mutation instanceof Put) { processed = this.getCoprocessorHost().preCheckAndPutAfterRowLock(row, family, - qualifier, compareOp, comparator, (Put) w); - } else if (w instanceof Delete) { + qualifier, compareOp, comparator, (Put)mutation); + } else if (mutation instanceof Delete) { processed = this.getCoprocessorHost().preCheckAndDeleteAfterRowLock(row, family, - qualifier, compareOp, comparator, (Delete) w); + qualifier, compareOp, comparator, (Delete)mutation); } if (processed != null) { return processed; } } + // NOTE: We used to wait here until mvcc caught up: mvcc.await(); + // Supposition is that now all changes are done under row locks, then when we go to read, + // we'll get the latest on this row. List result = get(get, false); - - boolean valueIsNull = comparator.getValue() == null || - comparator.getValue().length == 0; + boolean valueIsNull = comparator.getValue() == null || comparator.getValue().length == 0; boolean matches = false; long cellTs = 0; if (result.size() == 0 && valueIsNull) { matches = true; - } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && - valueIsNull) { + } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && valueIsNull) { matches = true; cellTs = result.get(0).getTimestamp(); } else if (result.size() == 1 && !valueIsNull) { Cell kv = result.get(0); cellTs = kv.getTimestamp(); int compareResult = CellComparator.compareValue(kv, comparator); - switch (compareOp) { - case LESS: - matches = compareResult < 0; - break; - case LESS_OR_EQUAL: - matches = compareResult <= 0; - break; - case EQUAL: - matches = compareResult == 0; - break; - case NOT_EQUAL: - matches = compareResult != 0; - break; - case GREATER_OR_EQUAL: - matches = compareResult >= 0; - break; - case GREATER: - matches = compareResult > 0; - break; - default: - throw new RuntimeException("Unknown Compare op " + compareOp.name()); - } + matches = matches(compareOp, compareResult); } - //If matches put the new put or delete the new delete + // If matches put the new put or delete the new delete if (matches) { // We have acquired the row lock already. If the system clock is NOT monotonically // non-decreasing (see HBASE-14070) we should make sure that the mutation has a @@ -3466,16 +3406,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi long now = EnvironmentEdgeManager.currentTime(); long ts = Math.max(now, cellTs); // ensure write is not eclipsed byte[] byteTs = Bytes.toBytes(ts); - - if (w instanceof Put) { - updateCellTimestamps(w.getFamilyCellMap().values(), byteTs); + if (mutation != null) { + if (mutation instanceof Put) { + updateCellTimestamps(mutation.getFamilyCellMap().values(), byteTs); + } + // And else 'delete' is not needed since it already does a second get, and sets the + // timestamp from get (see prepareDeleteTimestamps). + } else { + for (Mutation m: rowMutations.getMutations()) { + if (m instanceof Put) { + updateCellTimestamps(m.getFamilyCellMap().values(), byteTs); + } + } + // And else 'delete' is not needed since it already does a second get, and sets the + // timestamp from get (see prepareDeleteTimestamps). + } + // All edits for the given row (across all column families) must happen atomically. + if (mutation != null) { + doBatchMutate(mutation); + } else { + mutateRow(rowMutations); } - // else delete is not needed since it already does a second get, and sets the timestamp - // from get (see prepareDeleteTimestamps). - - // All edits for the given row (across all column families) must - // happen atomically. - doBatchMutate(w); this.checkAndMutateChecksPassed.increment(); return true; } @@ -3489,113 +3440,54 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - //TODO, Think that gets/puts and deletes should be refactored a bit so that - //the getting of the lock happens before, so that you would just pass it into - //the methods. So in the case of checkAndMutate you could just do lockRow, - //get, put, unlockRow or something + private void checkMutationType(final Mutation mutation, final byte [] row) + throws DoNotRetryIOException { + boolean isPut = mutation instanceof Put; + if (!isPut && !(mutation instanceof Delete)) { + throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must be Put or Delete"); + } + if (!Bytes.equals(row, mutation.getRow())) { + throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action's getRow must match"); + } + } - @Override - public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, - CompareOp compareOp, ByteArrayComparable comparator, RowMutations rm, - boolean writeToWAL) throws IOException { - checkReadOnly(); - //TODO, add check for value length or maybe even better move this to the - //client if this becomes a global setting - checkResources(); + private boolean matches(final CompareOp compareOp, final int compareResult) { + boolean matches = false; + switch (compareOp) { + case LESS: + matches = compareResult < 0; + break; + case LESS_OR_EQUAL: + matches = compareResult <= 0; + break; + case EQUAL: + matches = compareResult == 0; + break; + case NOT_EQUAL: + matches = compareResult != 0; + break; + case GREATER_OR_EQUAL: + matches = compareResult >= 0; + break; + case GREATER: + matches = compareResult > 0; + break; + default: + throw new RuntimeException("Unknown Compare op " + compareOp.name()); + } + return matches; + } - startRegionOperation(); - try { - Get get = new Get(row); - checkFamily(family); - get.addColumn(family, qualifier); - // Lock row - note that doBatchMutate will relock this row if called - RowLock rowLock = getRowLock(get.getRow()); - // wait for all previous transactions to complete (with lock held) - mvcc.await(); - try { - List result = get(get, false); - - boolean valueIsNull = comparator.getValue() == null || - comparator.getValue().length == 0; - boolean matches = false; - long cellTs = 0; - if (result.size() == 0 && valueIsNull) { - matches = true; - } else if (result.size() > 0 && result.get(0).getValueLength() == 0 && - valueIsNull) { - matches = true; - cellTs = result.get(0).getTimestamp(); - } else if (result.size() == 1 && !valueIsNull) { - Cell kv = result.get(0); - cellTs = kv.getTimestamp(); - int compareResult = CellComparator.compareValue(kv, comparator); - switch (compareOp) { - case LESS: - matches = compareResult < 0; - break; - case LESS_OR_EQUAL: - matches = compareResult <= 0; - break; - case EQUAL: - matches = compareResult == 0; - break; - case NOT_EQUAL: - matches = compareResult != 0; - break; - case GREATER_OR_EQUAL: - matches = compareResult >= 0; - break; - case GREATER: - matches = compareResult > 0; - break; - default: - throw new RuntimeException("Unknown Compare op " + compareOp.name()); - } - } - //If matches put the new put or delete the new delete - if (matches) { - // We have acquired the row lock already. If the system clock is NOT monotonically - // non-decreasing (see HBASE-14070) we should make sure that the mutation has a - // larger timestamp than what was observed via Get. doBatchMutate already does this, but - // there is no way to pass the cellTs. See HBASE-14054. - long now = EnvironmentEdgeManager.currentTime(); - long ts = Math.max(now, cellTs); // ensure write is not eclipsed - byte[] byteTs = Bytes.toBytes(ts); - - for (Mutation w : rm.getMutations()) { - if (w instanceof Put) { - updateCellTimestamps(w.getFamilyCellMap().values(), byteTs); - } - // else delete is not needed since it already does a second get, and sets the timestamp - // from get (see prepareDeleteTimestamps). - } - - // All edits for the given row (across all column families) must - // happen atomically. - mutateRow(rm); - this.checkAndMutateChecksPassed.increment(); - return true; - } - this.checkAndMutateChecksFailed.increment(); - return false; - } finally { - rowLock.release(); - } - } finally { - closeRegionOperation(); - } - } - - private void doBatchMutate(Mutation mutation) throws IOException { - // Currently this is only called for puts and deletes, so no nonces. - OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation}); - if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { - throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); - } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { - throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg()); - } - } + private void doBatchMutate(Mutation mutation) throws IOException { + // Currently this is only called for puts and deletes, so no nonces. + OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation}); + if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) { + throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg()); + } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) { + throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg()); + } + } /** * Complete taking the snapshot on the region. Writes the region info and adds references to the @@ -3657,40 +3549,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi void rewriteCellTags(Map> familyMap, final Mutation m) { // Check if we have any work to do and early out otherwise // Update these checks as more logic is added here - if (m.getTTL() == Long.MAX_VALUE) { return; } // From this point we know we have some work to do - for (Map.Entry> e: familyMap.entrySet()) { List cells = e.getValue(); assert cells instanceof RandomAccess; int listSize = cells.size(); for (int i = 0; i < listSize; i++) { Cell cell = cells.get(i); - List newTags = new ArrayList(); - Iterator tagIterator = CellUtil.tagsIterator(cell); - - // Carry forward existing tags - - while (tagIterator.hasNext()) { - - // Add any filters or tag specific rewrites here - - newTags.add(tagIterator.next()); - } - - // Cell TTL handling - - // Check again if we need to add a cell TTL because early out logic - // above may change when there are more tag based features in core. - if (m.getTTL() != Long.MAX_VALUE) { - // Add a cell TTL tag - newTags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL()))); - } - + List newTags = TagUtil.carryForwardTags(null, cell); + newTags = TagUtil.carryForwardTTLTag(newTags, m.getTTL()); // Rewrite the cell with the updated set of tags cells.set(i, new TagRewriteCell(cell, TagUtil.fromList(newTags))); } @@ -3766,49 +3637,64 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * should already have locked updatesLock.readLock(). This also does * not check the families for validity. * - * @param familyMap Map of kvs per family - * @param mvccNum The MVCC for this transaction. - * @param isInReplay true when adding replayed KVs into memstore - * @return the additional memory usage of the memstore caused by the - * new entries. + * @param familyMap Map of Cells by family + * @return the additional memory usage of the memstore caused by the new entries. */ - private long applyFamilyMapToMemstore(Map> familyMap, - long mvccNum, boolean isInReplay) throws IOException { + private long applyFamilyMapToMemstore(Map> familyMap, boolean replay, + long sequenceId) + throws IOException { long size = 0; - for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); List cells = e.getValue(); assert cells instanceof RandomAccess; - Store store = getStore(family); - int listSize = cells.size(); - for (int i=0; i < listSize; i++) { + size += applyToMemstore(getStore(family), cells, false, replay, sequenceId); + } + return size; + } + + /** + * @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be + * set; when set we will run operations that make sense in the increment/append scenario but + * that do not make sense otherwise. + * @return Memstore change in size on insert of these Cells. + * @see #applyToMemstore(Store, Cell, long) + */ + private long applyToMemstore(final Store store, final List cells, + final boolean delta, boolean replay, long sequenceId) + throws IOException { + // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!! + long size = 0; + boolean upsert = delta && store.getFamily().getMaxVersions() == 1; + int count = cells.size(); + if (upsert) { + size += store.upsert(cells, getSmallestReadPoint()); + } else { + for (int i = 0; i < count; i++) { Cell cell = cells.get(i); - if (cell.getSequenceId() == 0 || isInReplay) { - CellUtil.setSequenceId(cell, mvccNum); + // TODO: This looks wrong.. checking for sequenceid of zero is expensive!!!!! St.Ack + // When is it zero anyways? When replay? Then just rely on that flag. + if (cell.getSequenceId() == 0 || replay) { + CellUtil.setSequenceId(cell, sequenceId); } size += store.add(cell); } } - - return size; - } + return size; + } /** - * Remove all the keys listed in the map from the memstore. This method is - * called when a Put/Delete has updated memstore but subsequently fails to update - * the wal. This method is then invoked to rollback the memstore. + * @return Memstore change in size on insert of these Cells. + * @see #applyToMemstore(Store, List, boolean, boolean, long) */ - private void rollbackMemstore(List memstoreCells) { - int kvsRolledback = 0; - - for (Cell cell : memstoreCells) { - byte[] family = CellUtil.cloneFamily(cell); - Store store = getStore(family); - store.rollback(cell); - kvsRolledback++; + private long applyToMemstore(final Store store, final Cell cell, long sequenceId) + throws IOException { + // Any change in how we update Store/MemStore needs to also be done in other applyToMemstore!!!! + if (store == null) { + checkFamily(CellUtil.cloneFamily(cell)); + // Unreachable because checkFamily will throw exception } - LOG.debug("rollbackMemstore rolled back " + kvsRolledback); + return store.add(cell); } @Override @@ -3818,30 +3704,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - /** - * During replay, there could exist column families which are removed between region server - * failure and replay - */ - private void removeNonExistentColumnFamilyForReplay( - final Map> familyMap) { - List nonExistentList = null; - for (byte[] family : familyMap.keySet()) { - if (!this.htableDescriptor.hasFamily(family)) { - if (nonExistentList == null) { - nonExistentList = new ArrayList(); - } - nonExistentList.add(family); - } - } - if (nonExistentList != null) { - for (byte[] family : nonExistentList) { - // Perhaps schema was changed between crash and replay - LOG.info("No family for " + Bytes.toString(family) + " omit from reply."); - familyMap.remove(family); - } - } - } - @Override public void checkTimestamps(final Map> familyMap, long now) throws FailedSanityCheckException { @@ -5483,12 +5345,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return true; } finally { if (wal != null && !storeFiles.isEmpty()) { - // write a bulk load event when not all hfiles are loaded + // @rite a bulk load event when not all hfiles are loaded try { WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor( this.getRegionInfo().getTable(), ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles, seqId); - WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(), + WALUtil.writeBulkLoadMarkerAndSync(this.wal, getTableDesc(), getRegionInfo(), loadDescriptor, mvcc); } catch (IOException ioe) { if (this.rsServices != null) { @@ -5586,7 +5448,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // getSmallestReadPoint, before scannerReadPoints is updated. IsolationLevel isolationLevel = scan.getIsolationLevel(); synchronized(scannerReadPoints) { - this.readPt = getReadpoint(isolationLevel); + this.readPt = getReadPoint(isolationLevel); scannerReadPoints.put(this, this.readPt); } @@ -5750,7 +5612,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } // As the data is obtained from two independent heaps, we need to // ensure that result list is sorted, because Result relies on that. - Collections.sort(results, comparator); + sort(results, comparator); return moreValues; } @@ -6863,7 +6725,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public void processRowsWithLocks(RowProcessor processor, long timeout, long nonceGroup, long nonce) throws IOException { - for (byte[] row : processor.getRowsToLock()) { checkRow(row, "processRowsWithLocks"); } @@ -6871,23 +6732,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi checkReadOnly(); } checkResources(); - startRegionOperation(); WALEdit walEdit = new WALEdit(); - // 1. Run pre-process hook - try { - processor.preProcess(this, walEdit); - } catch (IOException e) { - closeRegionOperation(); - throw e; - } + // STEP 1. Run pre-process hook + preProcess(processor, walEdit); // Short circuit the read only case if (processor.readOnly()) { try { long now = EnvironmentEdgeManager.currentTime(); - doProcessRowWithTimeout( - processor, now, this, null, null, timeout); + doProcessRowWithTimeout(processor, now, this, null, null, timeout); processor.postProcess(this, walEdit, true); } finally { closeRegionOperation(); @@ -6895,118 +6749,81 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return; } - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; boolean locked; - boolean walSyncSuccessful = false; List acquiredRowLocks; long addedSize = 0; List mutations = new ArrayList(); Collection rowsToLock = processor.getRowsToLock(); - long mvccNum = 0; - WALKey walKey = null; + // This is assigned by mvcc either explicity in the below or in the guts of the WAL append + // when it assigns the edit a sequencedid (A.K.A the mvcc write number). + WriteEntry writeEntry = null; try { - // 2. Acquire the row lock(s) + // STEP 2. Acquire the row lock(s) acquiredRowLocks = new ArrayList(rowsToLock.size()); for (byte[] row : rowsToLock) { // Attempt to lock all involved rows, throw if any lock times out // use a writer lock for mixed reads and writes acquiredRowLocks.add(getRowLock(row)); } - // 3. Region lock + // STEP 3. Region lock lock(this.updatesLock.readLock(), acquiredRowLocks.size() == 0 ? 1 : acquiredRowLocks.size()); locked = true; - + boolean success = false; long now = EnvironmentEdgeManager.currentTime(); try { - // 4. Let the processor scan the rows, generate mutations and add - // waledits - doProcessRowWithTimeout( - processor, now, this, mutations, walEdit, timeout); - + // STEP 4. Let the processor scan the rows, generate mutations and add waledits + doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout); if (!mutations.isEmpty()) { - - // 5. Call the preBatchMutate hook + // STEP 5. Call the preBatchMutate hook processor.preBatchMutate(this, walEdit); - long txid = 0; - // 6. Append no sync + // STEP 6. Append and sync if walEdit has data to write out. if (!walEdit.isEmpty()) { - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, - processor.getClusterIds(), nonceGroup, nonce, mvcc); - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), - walKey, walEdit, false); - } - if(walKey == null){ - // since we use wal sequence Id as mvcc, for SKIP_WAL changes we need a "faked" WALEdit - // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId - walKey = this.appendEmptyEdit(this.wal); + writeEntry = doWALAppend(walEdit, getEffectiveDurability(processor.useDurability()), + processor.getClusterIds(), now, nonceGroup, nonce); + } else { + // We are here if WAL is being skipped. + writeEntry = this.mvcc.begin(); } - // 7. Start mvcc transaction - writeEntry = walKey.getWriteEntry(); - mvccNum = walKey.getSequenceId(); - - - - // 8. Apply to memstore + // STEP 7. Apply to memstore + long sequenceId = writeEntry.getWriteNumber(); for (Mutation m : mutations) { - // Handle any tag based cell features + // Handle any tag based cell features. + // TODO: Do we need to call rewriteCellTags down in applyToMemstore()? Why not before + // so tags go into WAL? rewriteCellTags(m.getFamilyCellMap(), m); - for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { Cell cell = cellScanner.current(); - CellUtil.setSequenceId(cell, mvccNum); - Store store = getStore(cell); - if (store == null) { - checkFamily(CellUtil.cloneFamily(cell)); - // unreachable + if (walEdit.isEmpty()) { + // If walEdit is empty, we put nothing in WAL. WAL stamps Cells with sequence id. + // If no WAL, need to stamp it here. + CellUtil.setSequenceId(cell, sequenceId); } - addedSize += store.add(cell); + Store store = getStore(cell); + addedSize += applyToMemstore(store, cell, sequenceId); } } + // STEP 8. Complete mvcc. + mvcc.completeAndWait(writeEntry); + writeEntry = null; - // 9. Release region lock + // STEP 9. Release region lock if (locked) { this.updatesLock.readLock().unlock(); locked = false; } - // 10. Release row lock(s) + // STEP 10. Release row lock(s) releaseRowLocks(acquiredRowLocks); - // 11. Sync edit log - if (txid != 0) { - syncOrDefer(txid, getEffectiveDurability(processor.useDurability())); - } - walSyncSuccessful = true; - // 12. call postBatchMutate hook + // STEP 11. call postBatchMutate hook processor.postBatchMutate(this); } + success = true; } finally { - // TODO: Make this method look like all other methods that are doing append/sync and - // memstore rollback such as append and doMiniBatchMutation. Currently it is a little - // different. Make them all share same code! - if (!mutations.isEmpty() && !walSyncSuccessful) { - LOG.warn("Wal sync failed. Roll back " + mutations.size() + - " memstore keyvalues for row(s):" + StringUtils.byteToHexString( - processor.getRowsToLock().iterator().next()) + "..."); - for (Mutation m : mutations) { - for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { - Cell cell = cellScanner.current(); - getStore(cell).rollback(cell); - } - } - if (writeEntry != null) { - mvcc.complete(writeEntry); - writeEntry = null; - } - } - // 13. Roll mvcc forward - if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); - } + // Call complete rather than completeAndWait because we probably had error if walKey != null + if (writeEntry != null) mvcc.complete(writeEntry); if (locked) { this.updatesLock.readLock().unlock(); } @@ -7014,18 +6831,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi releaseRowLocks(acquiredRowLocks); } - // 14. Run post-process hook - processor.postProcess(this, walEdit, walSyncSuccessful); - + // 12. Run post-process hook + processor.postProcess(this, walEdit, success); } finally { closeRegionOperation(); - if (!mutations.isEmpty() && - isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) { + if (!mutations.isEmpty() && isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize))) { requestFlush(); } } } + private void preProcess(final RowProcessor processor, final WALEdit walEdit) + throws IOException { + try { + processor.preProcess(this, walEdit); + } catch (IOException e) { + closeRegionOperation(); + throw e; + } + } + private void doProcessRowWithTimeout(final RowProcessor processor, final long now, final HRegion region, @@ -7076,500 +6901,400 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - /** - * @return The passed-in {@code tags} but with the tags from {@code cell} added. - */ - private static List carryForwardTags(final Cell cell, final List tags) { - if (cell.getTagsLength() <= 0) return tags; - List newTags = tags == null? new ArrayList(): /*Append Tags*/tags; - Iterator i = CellUtil.tagsIterator(cell); - while (i.hasNext()) newTags.add(i.next()); - return newTags; + public Result append(Append append) throws IOException { + return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE); } - /** - * Run a Get against passed in store on passed row, etc. - * @return Get result. - */ - private List doGet(final Store store, final byte [] row, - final Map.Entry> family, final TimeRange tr) - throws IOException { - // Sort the cells so that they match the order that they - // appear in the Get results. Otherwise, we won't be able to - // find the existing values if the cells are not specified - // in order by the client since cells are in an array list. - Collections.sort(family.getValue(), store.getComparator()); - // Get previous values for all columns in this family - Get get = new Get(row); - for (Cell cell : family.getValue()) { - get.addColumn(family.getKey(), CellUtil.cloneQualifier(cell)); - } - if (tr != null) get.setTimeRange(tr.getMin(), tr.getMax()); - return get(get, false); + @Override + public Result append(Append mutation, long nonceGroup, long nonce) throws IOException { + return doDelta(Operation.APPEND, mutation, nonceGroup, nonce, mutation.isReturnResults()); } - public Result append(Append append) throws IOException { - return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE); + public Result increment(Increment increment) throws IOException { + return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE); } - // TODO: There's a lot of boiler plate code identical to increment. - // We should refactor append and increment as local get-mutate-put - // transactions, so all stores only go through one code path for puts. - @Override - public Result append(Append mutate, long nonceGroup, long nonce) throws IOException { - Operation op = Operation.APPEND; - byte[] row = mutate.getRow(); - checkRow(row, op.toString()); - checkFamilies(mutate.getFamilyCellMap().keySet()); - boolean flush = false; - Durability durability = getEffectiveDurability(mutate.getDurability()); - boolean writeToWAL = durability != Durability.SKIP_WAL; - WALEdit walEdits = null; - List allKVs = new ArrayList(mutate.size()); - Map> tempMemstore = new HashMap>(); - long size = 0; - long txid = 0; + public Result increment(Increment mutation, long nonceGroup, long nonce) + throws IOException { + return doDelta(Operation.INCREMENT, mutation, nonceGroup, nonce, mutation.isReturnResults()); + } + + /** + * Add "deltas" to Cells. Deltas are increments or appends. Switch on op. + * + *

If increment, add deltas to current values or if an append, then + * append the deltas to the current Cell values. + * + *

Append and Increment code paths are mostly the same. They differ in just a few places. + * This method does the code path for increment and append and then in key spots, switches + * on the passed in op to do increment or append specific paths. + */ + private Result doDelta(Operation op, Mutation mutation, long nonceGroup, long nonce, + boolean returnResults) + throws IOException { checkReadOnly(); checkResources(); - // Lock row - startRegionOperation(op); + checkRow(mutation.getRow(), op.toString()); + checkFamilies(mutation.getFamilyCellMap().keySet()); this.writeRequestsCount.increment(); - RowLock rowLock = null; - WALKey walKey = null; - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; - boolean doRollBackMemstore = false; + WriteEntry writeEntry = null; + startRegionOperation(op); + long accumulatedResultSize = 0; + List results = returnResults? new ArrayList(mutation.size()): null; + RowLock rowLock = getRowLock(mutation.getRow()); try { - rowLock = getRowLock(row); - assert rowLock != null; + lock(this.updatesLock.readLock()); try { - lock(this.updatesLock.readLock()); - try { - // Wait for all prior MVCC transactions to finish - while we hold the row lock - // (so that we are guaranteed to see the latest state when we do our Get) - mvcc.await(); - if (this.coprocessorHost != null) { - Result r = this.coprocessorHost.preAppendAfterRowLock(mutate); - if (r!= null) { - return r; - } - } - long now = EnvironmentEdgeManager.currentTime(); - // Process each family - for (Map.Entry> family : mutate.getFamilyCellMap().entrySet()) { - Store store = stores.get(family.getKey()); - List kvs = new ArrayList(family.getValue().size()); - - List results = doGet(store, row, family, null); - - // Iterate the input columns and update existing values if they were - // found, otherwise add new column initialized to the append value - - // Avoid as much copying as possible. We may need to rewrite and - // consolidate tags. Bytes are only copied once. - // Would be nice if KeyValue had scatter/gather logic - int idx = 0; - for (Cell cell : family.getValue()) { - Cell newCell; - Cell oldCell = null; - if (idx < results.size() - && CellUtil.matchingQualifier(results.get(idx), cell)) { - oldCell = results.get(idx); - long ts = Math.max(now, oldCell.getTimestamp()); - - // Process cell tags - // Make a union of the set of tags in the old and new KVs - List newTags = carryForwardTags(oldCell, new ArrayList()); - newTags = carryForwardTags(cell, newTags); - - // Cell TTL handling - - if (mutate.getTTL() != Long.MAX_VALUE) { - // Add the new TTL tag - newTags.add( - new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL()))); - } - - // Rebuild tags - byte[] tagBytes = TagUtil.fromList(newTags); - - // allocate an empty cell once - newCell = new KeyValue(row.length, cell.getFamilyLength(), - cell.getQualifierLength(), ts, KeyValue.Type.Put, - oldCell.getValueLength() + cell.getValueLength(), - tagBytes.length); - // copy in row, family, and qualifier - System.arraycopy(cell.getRowArray(), cell.getRowOffset(), - newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength()); - System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(), - newCell.getFamilyArray(), newCell.getFamilyOffset(), - cell.getFamilyLength()); - System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(), - newCell.getQualifierArray(), newCell.getQualifierOffset(), - cell.getQualifierLength()); - // copy in the value - CellUtil.copyValueTo(oldCell, newCell.getValueArray(), newCell.getValueOffset()); - System.arraycopy(cell.getValueArray(), cell.getValueOffset(), - newCell.getValueArray(), - newCell.getValueOffset() + oldCell.getValueLength(), - cell.getValueLength()); - // Copy in tag data - System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(), - tagBytes.length); - idx++; - } else { - // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP - CellUtil.updateLatestStamp(cell, now); - - // Cell TTL handling - - if (mutate.getTTL() != Long.MAX_VALUE) { - List newTags = new ArrayList(1); - newTags.add( - new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutate.getTTL()))); - // Add the new TTL tag - newCell = new TagRewriteCell(cell, TagUtil.fromList(newTags)); - } else { - newCell = cell; - } - } - - // Give coprocessors a chance to update the new cell - if (coprocessorHost != null) { - newCell = coprocessorHost.postMutationBeforeWAL(RegionObserver.MutationType.APPEND, - mutate, oldCell, newCell); - } - kvs.add(newCell); - - // Append update to WAL - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } - walEdits.add(newCell); - } - } - - //store the kvs to the temporary memstore before writing WAL - tempMemstore.put(store, kvs); - } - - // Actually write to WAL now - if (walEdits != null && !walEdits.isEmpty()) { - if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey( - getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), - WALKey.NO_SEQUENCE_ID, - nonceGroup, - nonce, - mvcc); - txid = - this.wal.append(this.htableDescriptor, getRegionInfo(), walKey, walEdits, true); - } else { - recordMutationWithoutWal(mutate.getFamilyCellMap()); - } - } - if (walKey == null) { - // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned - walKey = this.appendEmptyEdit(this.wal); - } - - // now start my own transaction - writeEntry = walKey.getWriteEntry(); - - - // Actually write to Memstore now - if (!tempMemstore.isEmpty()) { - for (Map.Entry> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); - if (store.getFamily().getMaxVersions() == 1) { - // upsert if VERSIONS for this CF == 1 - // Is this right? It immediately becomes visible? St.Ack 20150907 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); - } else { - // otherwise keep older versions around - for (Cell cell: entry.getValue()) { - CellUtil.setSequenceId(cell, writeEntry.getWriteNumber()); - size += store.add(cell); - doRollBackMemstore = true; - } - } - // We add to all KVs here whereas when doing increment, we do it - // earlier... why? - allKVs.addAll(entry.getValue()); - } - - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); - } - } finally { - this.updatesLock.readLock().unlock(); + Result cpResult = doCoprocessorPreCall(op, mutation); + if (cpResult != null) return cpResult; + Durability effectiveDurability = getEffectiveDurability(mutation.getDurability()); + Map> forMemStore = + new HashMap>(mutation.getFamilyCellMap().size()); + // Reckon Cells to apply to WAL -- in returned walEdit -- and what to add to memstore and + // what to return back to the client (in 'forMemStore' and 'results' respectively). + WALEdit walEdit = reckonDeltas(op, mutation, effectiveDurability, forMemStore, results); + // Actually write to WAL now if a walEdit to apply. + if (walEdit != null && !walEdit.isEmpty()) { + writeEntry = doWALAppend(walEdit, durability, nonceGroup, nonce); + } else { + // If walEdits is empty, it means we skipped the WAL; update counters and start an mvcc + // transaction. + recordMutationWithoutWal(mutation.getFamilyCellMap()); + writeEntry = mvcc.begin(); } - + // Now write to MemStore. Do it a column family at a time. + long sequenceId = writeEntry.getWriteNumber(); + for (Map.Entry> e: forMemStore.entrySet()) { + accumulatedResultSize += + applyToMemstore(e.getKey(), e.getValue(), true, false, sequenceId); + } + mvcc.completeAndWait(writeEntry); + writeEntry = null; } finally { - rowLock.release(); - rowLock = null; - } - // sync the transaction log outside the rowlock - if(txid != 0){ - syncOrDefer(txid, durability); + this.updatesLock.readLock().unlock(); } - doRollBackMemstore = false; + // If results is null, then client asked that we not return the calculated results. + return results != null? Result.create(results): null; } finally { - if (rowLock != null) { - rowLock.release(); - } - // if the wal sync was unsuccessful, remove keys from memstore - if (doRollBackMemstore) { - rollbackMemstore(allKVs); - if (writeEntry != null) mvcc.complete(writeEntry); - } else if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); - } - + // Call complete always, even on success. doDelta is doing a Get READ_UNCOMMITTED when it goes + // to get current value under an exclusive lock so no need so no need to wait to return to + // the client. Means only way to read-your-own-increment or append is to come in with an + // a 0 increment. + if (writeEntry != null) mvcc.complete(writeEntry); + rowLock.release(); + // Request a cache flush if over the limit. Do it outside update lock. + if (isFlushSize(this.addAndGetGlobalMemstoreSize(accumulatedResultSize))) requestFlush(); closeRegionOperation(op); + if (this.metricsRegion != null) { + switch (op) { + case INCREMENT: + this.metricsRegion.updateIncrement(); + break; + case APPEND: + this.metricsRegion.updateAppend(); + break; + default: + break; + } + } } - - if (this.metricsRegion != null) { - this.metricsRegion.updateAppend(); - } - - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); - } - - return mutate.isReturnResults() ? Result.create(allKVs) : null; } - public Result increment(Increment increment) throws IOException { - return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE); + private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, long nonceGroup, + long nonce) + throws IOException { + return doWALAppend(walEdit, durability, WALKey.EMPTY_UUIDS, System.currentTimeMillis(), + nonceGroup, nonce); } - // TODO: There's a lot of boiler plate code identical to append. - // We should refactor append and increment as local get-mutate-put - // transactions, so all stores only go through one code path for puts. - - // They are subtley different in quiet a few ways. This came out only - // after study. I am not sure that many of the differences are intentional. - // TODO: St.Ack 20150907 - - @Override - public Result increment(Increment mutation, long nonceGroup, long nonce) + /** + * @return writeEntry associated with this append + */ + private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List clusterIds, + long now, long nonceGroup, long nonce) throws IOException { - Operation op = Operation.INCREMENT; - byte [] row = mutation.getRow(); - checkRow(row, op.toString()); - checkFamilies(mutation.getFamilyCellMap().keySet()); - boolean flush = false; - Durability durability = getEffectiveDurability(mutation.getDurability()); - boolean writeToWAL = durability != Durability.SKIP_WAL; - WALEdit walEdits = null; - List allKVs = new ArrayList(mutation.size()); - - Map> tempMemstore = new HashMap>(); - long size = 0; - long txid = 0; - checkReadOnly(); - checkResources(); - // Lock row - startRegionOperation(op); - this.writeRequestsCount.increment(); - RowLock rowLock = null; - WALKey walKey = null; - MultiVersionConcurrencyControl.WriteEntry writeEntry = null; - boolean doRollBackMemstore = false; - TimeRange tr = mutation.getTimeRange(); + WriteEntry writeEntry = null; + // Using default cluster id, as this can only happen in the originating cluster. + // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey + // here instead of WALKey directly to support legacy coprocessors. + WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, + nonceGroup, nonce, mvcc); try { - rowLock = getRowLock(row); - assert rowLock != null; - try { - lock(this.updatesLock.readLock()); - try { - // wait for all prior MVCC transactions to finish - while we hold the row lock - // (so that we are guaranteed to see the latest state) - mvcc.await(); - if (this.coprocessorHost != null) { - Result r = this.coprocessorHost.preIncrementAfterRowLock(mutation); - if (r != null) { - return r; - } - } - long now = EnvironmentEdgeManager.currentTime(); - // Process each family - for (Map.Entry> family: mutation.getFamilyCellMap().entrySet()) { - Store store = stores.get(family.getKey()); - List kvs = new ArrayList(family.getValue().size()); - - List results = doGet(store, row, family, tr); - - // Iterate the input columns and update existing values if they were - // found, otherwise add new column initialized to the increment amount - - // Avoid as much copying as possible. We may need to rewrite and - // consolidate tags. Bytes are only copied once. - // Would be nice if KeyValue had scatter/gather logic - int idx = 0; - // HERE WE DIVERGE FROM APPEND - List edits = family.getValue(); - for (int i = 0; i < edits.size(); i++) { - Cell cell = edits.get(i); - long amount = Bytes.toLong(CellUtil.cloneValue(cell)); - boolean noWriteBack = (amount == 0); - - List newTags = carryForwardTags(cell, new ArrayList()); - - Cell c = null; - long ts = now; - if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) { - c = results.get(idx); - ts = Math.max(now, c.getTimestamp()); - if(c.getValueLength() == Bytes.SIZEOF_LONG) { - amount += CellUtil.getValueAsLong(c); - } else { - // throw DoNotRetryIOException instead of IllegalArgumentException - throw new org.apache.hadoop.hbase.DoNotRetryIOException( - "Attempted to increment field that isn't 64 bits wide"); - } - // Carry tags forward from previous version - newTags = carryForwardTags(c, newTags); - if (i < (edits.size() - 1) && !CellUtil.matchingQualifier(cell, edits.get(i + 1))) { - idx++; - } - } - - // Append new incremented KeyValue to list - byte[] q = CellUtil.cloneQualifier(cell); - byte[] val = Bytes.toBytes(amount); - - // Add the TTL tag if the mutation carried one - if (mutation.getTTL() != Long.MAX_VALUE) { - newTags.add( - new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(mutation.getTTL()))); - } - - Cell newKV = new KeyValue(row, 0, row.length, - family.getKey(), 0, family.getKey().length, - q, 0, q.length, - ts, - KeyValue.Type.Put, - val, 0, val.length, - newTags); - - // Give coprocessors a chance to update the new cell - if (coprocessorHost != null) { - newKV = coprocessorHost.postMutationBeforeWAL( - RegionObserver.MutationType.INCREMENT, mutation, c, newKV); - } - allKVs.add(newKV); - - if (!noWriteBack) { - kvs.add(newKV); - - // Prepare WAL updates - if (writeToWAL) { - if (walEdits == null) { - walEdits = new WALEdit(); - } - walEdits.add(newKV); - } - } - } - - //store the kvs to the temporary memstore before writing WAL - if (!kvs.isEmpty()) { - tempMemstore.put(store, kvs); - } - } - - // Actually write to WAL now - if (walEdits != null && !walEdits.isEmpty()) { - if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), - WALKey.NO_SEQUENCE_ID, - nonceGroup, - nonce, - mvcc); - txid = this.wal.append(this.htableDescriptor, this.getRegionInfo(), - walKey, walEdits, true); - } else { - recordMutationWithoutWal(mutation.getFamilyCellMap()); - } - } - if (walKey == null) { - // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned - walKey = this.appendEmptyEdit(this.wal); - } + long txid = + this.wal.append(this.htableDescriptor, this.getRegionInfo(), walKey, walEdit, true); + // Call sync on our edit. + if (txid != 0) sync(txid, durability); + writeEntry = walKey.getWriteEntry(); + } catch (IOException ioe) { + if (walKey != null) mvcc.complete(walKey.getWriteEntry()); + throw ioe; + } + return writeEntry; + } - // now start my own transaction - writeEntry = walKey.getWriteEntry(); + /** + * Do coprocessor pre-increment or pre-append call. + * @return Result returned out of the coprocessor, which means bypass all further processing and + * return the proffered Result instead, or null which means proceed. + */ + private Result doCoprocessorPreCall(final Operation op, final Mutation mutation) + throws IOException { + Result result = null; + if (this.coprocessorHost != null) { + switch(op) { + case INCREMENT: + result = this.coprocessorHost.preIncrementAfterRowLock((Increment)mutation); + break; + case APPEND: + result = this.coprocessorHost.preAppendAfterRowLock((Append)mutation); + break; + default: throw new UnsupportedOperationException(op.toString()); + } + } + return result; + } - // Actually write to Memstore now - if (!tempMemstore.isEmpty()) { - for (Map.Entry> entry : tempMemstore.entrySet()) { - Store store = entry.getKey(); - if (store.getFamily().getMaxVersions() == 1) { - // upsert if VERSIONS for this CF == 1 - // Is this right? It immediately becomes visible? St.Ack 20150907 - size += store.upsert(entry.getValue(), getSmallestReadPoint()); - } else { - // otherwise keep older versions around - for (Cell cell : entry.getValue()) { - CellUtil.setSequenceId(cell, writeEntry.getWriteNumber()); - size += store.add(cell); - doRollBackMemstore = true; - } - } - } - size = this.addAndGetGlobalMemstoreSize(size); - flush = isFlushSize(size); + /** + * Reckon the Cells to apply to WAL, memstore, and to return to the Client; these Sets are not + * always the same dependent on whether to write WAL or if the amount to increment is zero (in + * this case we write back nothing, just return latest Cell value to the client). + * + * @param results Fill in here what goes back to the Client if it is non-null (if null, client + * doesn't want results). + * @param forMemStore Fill in here what to apply to the MemStore (by Store). + * @return A WALEdit to apply to WAL or null if we are to skip the WAL. + */ + private WALEdit reckonDeltas(final Operation op, final Mutation mutation, + final Durability effectiveDurability, final Map> forMemStore, + final List results) + throws IOException { + WALEdit walEdit = null; + long now = EnvironmentEdgeManager.currentTime(); + final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL; + // Process a Store/family at a time. + for (Map.Entry> entry: mutation.getFamilyCellMap().entrySet()) { + final byte [] columnFamilyName = entry.getKey(); + List deltas = entry.getValue(); + Store store = this.stores.get(columnFamilyName); + // Reckon for the Store what to apply to WAL and MemStore. + List toApply = + reckonDeltasByStore(store, op, mutation, effectiveDurability, now, deltas, results); + if (!toApply.isEmpty()) { + forMemStore.put(store, toApply); + if (writeToWAL) { + if (walEdit == null) { + walEdit = new WALEdit(); } - } finally { - this.updatesLock.readLock().unlock(); + walEdit.getCells().addAll(toApply); } - } finally { - rowLock.release(); - rowLock = null; } - // sync the transaction log outside the rowlock - if(txid != 0){ - syncOrDefer(txid, durability); + } + return walEdit; + } + + /** + * Reckon the Cells to apply to WAL, memstore, and to return to the Client in passed + * column family/Store. + * + * Does Get of current value and then adds passed in deltas for this Store returning the result. + * + * @param op Whether Increment or Append + * @param mutation The encompassing Mutation object + * @param deltas Changes to apply to this Store; either increment amount or data to append + * @param results In here we accumulate all the Cells we are to return to the client; this List + * can be larger than what we return in case where delta is zero; i.e. don't write + * out new values, just return current value. If null, client doesn't want results returned. + * @return Resulting Cells after deltas have been applied to current + * values. Side effect is our filling out of the results List. + */ + private List reckonDeltasByStore(final Store store, final Operation op, + final Mutation mutation, final Durability effectiveDurability, final long now, + final List deltas, final List results) + throws IOException { + byte [] columnFamily = store.getFamily().getName(); + List toApply = new ArrayList(deltas.size()); + // Get previous values for all columns in this family. + List currentValues = get(mutation, store, deltas, + null/*Default IsolationLevel*/, + op == Operation.INCREMENT? ((Increment)mutation).getTimeRange(): null); + // Iterate the input columns and update existing values if they were found, otherwise + // add new column initialized to the delta amount + int currentValuesIndex = 0; + for (int i = 0; i < deltas.size(); i++) { + Cell delta = deltas.get(i); + Cell currentValue = null; + if (currentValuesIndex < currentValues.size() && + CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) { + currentValue = currentValues.get(currentValuesIndex); + if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) { + currentValuesIndex++; + } + } + // Switch on whether this an increment or an append building the new Cell to apply. + Cell newCell = null; + MutationType mutationType = null; + boolean apply = true; + switch (op) { + case INCREMENT: + mutationType = MutationType.INCREMENT; + // If delta amount to apply is 0, don't write WAL or MemStore. + long deltaAmount = getLongValue(delta); + apply = deltaAmount != 0; + newCell = reckonIncrement(delta, deltaAmount, currentValue, columnFamily, now, + (Increment)mutation); + break; + case APPEND: + mutationType = MutationType.APPEND; + // Always apply Append. TODO: Does empty delta value mean reset Cell? It seems to. + newCell = reckonAppend(delta, currentValue, now, (Append)mutation); + break; + default: throw new UnsupportedOperationException(op.toString()); + } + + // Give coprocessors a chance to update the new cell + if (coprocessorHost != null) { + newCell = + coprocessorHost.postMutationBeforeWAL(mutationType, mutation, currentValue, newCell); } - doRollBackMemstore = false; - } finally { - if (rowLock != null) { - rowLock.release(); + // If apply, we need to update memstore/WAL with new value; add it toApply. + if (apply) { + toApply.add(newCell); } - // if the wal sync was unsuccessful, remove keys from memstore - if (doRollBackMemstore) { - for(List cells: tempMemstore.values()) { - rollbackMemstore(cells); - } - if (writeEntry != null) mvcc.complete(writeEntry); - } else if (writeEntry != null) { - mvcc.completeAndWait(writeEntry); + // Add to results to get returned to the Client. If null, cilent does not want results. + if (results != null) { + results.add(newCell); } - closeRegionOperation(Operation.INCREMENT); - if (this.metricsRegion != null) { - this.metricsRegion.updateIncrement(); + } + return toApply; + } + + /** + * Calculate new Increment Cell. + * @return New Increment Cell with delta applied to currentValue if currentValue is not null; + * otherwise, a new Cell with the delta set as its value. + */ + private Cell reckonIncrement(final Cell delta, final long deltaAmount, final Cell currentValue, + byte [] columnFamily, final long now, Mutation mutation) + throws IOException { + // Forward any tags found on the delta. + List tags = TagUtil.carryForwardTags(delta); + long newValue = deltaAmount; + long ts = now; + if (currentValue != null) { + tags = TagUtil.carryForwardTags(tags, currentValue); + ts = Math.max(now, currentValue.getTimestamp()); + newValue += getLongValue(currentValue); + } + // Now make up the new Cell. TODO: FIX. This is carnel knowledge of how KeyValues are made... + // doesn't work well with offheaping or if we are doing a different Cell type. + byte [] incrementAmountInBytes = Bytes.toBytes(newValue); + tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL()); + byte [] row = mutation.getRow(); + return new KeyValue(row, 0, row.length, + columnFamily, 0, columnFamily.length, + delta.getQualifierArray(), delta.getQualifierOffset(), delta.getQualifierLength(), + ts, KeyValue.Type.Put, + incrementAmountInBytes, 0, incrementAmountInBytes.length, + tags); + } + + private Cell reckonAppend(final Cell delta, final Cell currentValue, final long now, + Append mutation) + throws IOException { + // Forward any tags found on the delta. + List tags = TagUtil.carryForwardTags(delta); + long ts = now; + Cell newCell = null; + byte [] row = mutation.getRow(); + if (currentValue != null) { + tags = TagUtil.carryForwardTags(tags, currentValue); + ts = Math.max(now, currentValue.getTimestamp()); + tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL()); + byte[] tagBytes = TagUtil.fromList(tags); + // Allocate an empty cell and copy in all parts. + // TODO: This is intimate knowledge of how a KeyValue is made. Undo!!! Prevents our doing + // other Cell types. Copying on-heap too if an off-heap Cell. + newCell = new KeyValue(row.length, delta.getFamilyLength(), + delta.getQualifierLength(), ts, KeyValue.Type.Put, + delta.getValueLength() + currentValue.getValueLength(), + tagBytes == null? 0: tagBytes.length); + // Copy in row, family, and qualifier + System.arraycopy(row, 0, newCell.getRowArray(), newCell.getRowOffset(), row.length); + System.arraycopy(delta.getFamilyArray(), delta.getFamilyOffset(), + newCell.getFamilyArray(), newCell.getFamilyOffset(), delta.getFamilyLength()); + System.arraycopy(delta.getQualifierArray(), delta.getQualifierOffset(), + newCell.getQualifierArray(), newCell.getQualifierOffset(), delta.getQualifierLength()); + // Copy in the value + CellUtil.copyValueTo(currentValue, newCell.getValueArray(), newCell.getValueOffset()); + System.arraycopy(delta.getValueArray(), delta.getValueOffset(), + newCell.getValueArray(), newCell.getValueOffset() + currentValue.getValueLength(), + delta.getValueLength()); + // Copy in tag data + if (tagBytes != null) { + System.arraycopy(tagBytes, 0, + newCell.getTagsArray(), newCell.getTagsOffset(), tagBytes.length); } + } else { + // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP + CellUtil.updateLatestStamp(delta, now); + newCell = delta; + tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL()); + if (tags != null) { + newCell = new TagRewriteCell(delta, TagUtil.fromList(tags)); + } + } + return newCell; + } + + /** + * @return Get the long out of the passed in Cell + */ + private static long getLongValue(final Cell cell) throws DoNotRetryIOException { + int len = cell.getValueLength(); + if (len != Bytes.SIZEOF_LONG) { + // throw DoNotRetryIOException instead of IllegalArgumentException + throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide"); } + return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len); + } - if (flush) { - // Request a cache flush. Do it outside update lock. - requestFlush(); + /** + * Do a specific Get on passed columnFamily and column qualifiers. + * @param mutation Mutation we are doing this Get for. + * @param columnFamily Which column family on row (TODO: Go all Gets in one go) + * @param coordinates Cells from mutation used as coordinates applied to Get. + * @return Return list of Cells found. + */ + private List get(final Mutation mutation, final Store store, + final List coordinates, final IsolationLevel isolation, final TimeRange tr) + throws IOException { + // Sort the cells so that they match the order that they appear in the Get results. Otherwise, + // we won't be able to find the existing values if the cells are not specified in order by the + // client since cells are in an array list. + // TODO: I don't get why we are sorting. St.Ack 20150107 + sort(coordinates, store.getComparator()); + Get get = new Get(mutation.getRow()); + if (isolation != null) { + get.setIsolationLevel(isolation); + } + for (Cell cell: coordinates) { + get.addColumn(store.getFamily().getName(), CellUtil.cloneQualifier(cell)); + } + // Increments carry time range. If an Increment instance, put it on the Get. + if (tr != null) { + get.setTimeRange(tr.getMin(), tr.getMax()); } - return mutation.isReturnResults() ? Result.create(allKVs) : null; + return get(get, false); + } + + /** + * @return Sorted list of cells using comparator + */ + private static List sort(List cells, final Comparator comparator) { + Collections.sort(cells, comparator); + return cells; } // @@ -7588,7 +7313,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 44 * ClassSize.REFERENCE + 3 * Bytes.SIZEOF_INT + + 44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (14 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); @@ -7625,20 +7350,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return heapSize; } - /* - * This method calls System.exit. - * @param message Message to print out. May be null. - */ - private static void printUsageAndExit(final String message) { - if (message != null && message.length() > 0) System.out.println(message); - System.out.println("Usage: HRegion CATALOG_TABLE_DIR [major_compact]"); - System.out.println("Options:"); - System.out.println(" major_compact Pass this option to major compact " + - "passed region."); - System.out.println("Default outputs scan of passed region."); - System.exit(1); - } - @Override public boolean registerService(Service instance) { /* @@ -7712,53 +7423,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return responseBuilder.build(); } - /* - * Process table. - * Do major compaction or list content. - * @throws IOException - */ - private static void processTable(final FileSystem fs, final Path p, - final WALFactory walFactory, final Configuration c, - final boolean majorCompact) - throws IOException { - HRegion region; - FSTableDescriptors fst = new FSTableDescriptors(c); - // Currently expects tables have one region only. - if (FSUtils.getTableName(p).equals(TableName.META_TABLE_NAME)) { - final WAL wal = walFactory.getMetaWAL( - HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes()); - region = HRegion.newHRegion(p, wal, fs, c, - HRegionInfo.FIRST_META_REGIONINFO, - fst.get(TableName.META_TABLE_NAME), null); - } else { - throw new IOException("Not a known catalog table: " + p.toString()); - } - try { - region.mvcc.advanceTo(region.initialize(null)); - if (majorCompact) { - region.compact(true); - } else { - // Default behavior - Scan scan = new Scan(); - // scan.addFamily(HConstants.CATALOG_FAMILY); - RegionScanner scanner = region.getScanner(scan); - try { - List kvs = new ArrayList(); - boolean done; - do { - kvs.clear(); - done = scanner.next(kvs); - if (kvs.size() > 0) LOG.info(kvs); - } while (done); - } finally { - scanner.close(); - } - } - } finally { - region.close(); - } - } - boolean shouldForceSplit() { return this.splitRequest; } @@ -7860,26 +7524,26 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi justification="Intentional") public void startRegionOperation(Operation op) throws IOException { switch (op) { - case GET: // read operations - case SCAN: - checkReadsEnabled(); - case INCREMENT: // write operations - case APPEND: - case SPLIT_REGION: - case MERGE_REGION: - case PUT: - case DELETE: - case BATCH_MUTATE: - case COMPACT_REGION: - // when a region is in recovering state, no read, split or merge is allowed - if (isRecovering() && (this.disallowWritesInRecovering || + case GET: // read operations + case SCAN: + checkReadsEnabled(); + case INCREMENT: // write operations + case APPEND: + case SPLIT_REGION: + case MERGE_REGION: + case PUT: + case DELETE: + case BATCH_MUTATE: + case COMPACT_REGION: + // when a region is in recovering state, no read, split or merge is allowed + if (isRecovering() && (this.disallowWritesInRecovering || (op != Operation.PUT && op != Operation.DELETE && op != Operation.BATCH_MUTATE))) { - throw new RegionInRecoveryException(getRegionInfo().getRegionNameAsString() + - " is recovering; cannot take reads"); - } - break; - default: - break; + throw new RegionInRecoveryException(getRegionInfo().getRegionNameAsString() + + " is recovering; cannot take reads"); + } + break; + default: + break; } if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION || op == Operation.COMPACT_REGION) { @@ -8011,12 +7675,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /** - * Calls sync with the given transaction ID if the region's table is not - * deferring it. + * Calls sync with the given transaction ID * @param txid should sync up to which transaction * @throws IOException If anything goes wrong with DFS */ - private void syncOrDefer(long txid, Durability durability) throws IOException { + private void sync(long txid, Durability durability) throws IOException { if (this.getRegionInfo().isMetaRegion()) { this.wal.sync(txid); } else { @@ -8077,45 +7740,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } }; - /** - * Facility for dumping and compacting catalog tables. - * Only does catalog tables since these are only tables we for sure know - * schema on. For usage run: - *

-   *   ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion
-   * 
- * @throws IOException - */ - public static void main(String[] args) throws IOException { - if (args.length < 1) { - printUsageAndExit(null); - } - boolean majorCompact = false; - if (args.length > 1) { - if (!args[1].toLowerCase().startsWith("major")) { - printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">"); - } - majorCompact = true; - } - final Path tableDir = new Path(args[0]); - final Configuration c = HBaseConfiguration.create(); - final FileSystem fs = FileSystem.get(c); - final Path logdir = new Path(c.get("hbase.tmp.dir")); - final String logname = "wal" + FSUtils.getTableName(tableDir) + System.currentTimeMillis(); - - final Configuration walConf = new Configuration(c); - FSUtils.setRootDir(walConf, logdir); - final WALFactory wals = new WALFactory(walConf, null, logname); - try { - processTable(fs, tableDir, wals, c, majorCompact); - } finally { - wals.close(); - // TODO: is this still right? - BlockCache bc = new CacheConfig(c).getBlockCache(); - if (bc != null) bc.shutdown(); - } - } - @Override public long getOpenSeqNum() { return this.openSeqNum; @@ -8153,39 +7777,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi assert newValue >= 0; } - /** - * Do not change this sequence id. - * @return sequenceId - */ @VisibleForTesting - public long getSequenceId() { - return this.mvcc.getReadPoint(); - } - - - /** - * Append a faked WALEdit in order to get a long sequence number and wal syncer will just ignore - * the WALEdit append later. - * @param wal - * @return Return the key used appending with no sync and no append. - * @throws IOException - */ - private WALKey appendEmptyEdit(final WAL wal) throws IOException { - // we use HLogKey here instead of WALKey directly to support legacy coprocessors. - @SuppressWarnings("deprecation") - WALKey key = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), - getRegionInfo().getTable(), WALKey.NO_SEQUENCE_ID, 0, null, - HConstants.NO_NONCE, HConstants.NO_NONCE, getMVCC()); - - // Call append but with an empty WALEdit. The returned sequence id will not be associated - // with any edit and we can be sure it went in after all outstanding appends. - try { - wal.append(getTableDesc(), getRegionInfo(), key, WALEdit.EMPTY_WALEDIT, false); - } catch (Throwable t) { - // If exception, our mvcc won't get cleaned up by client, so do it here. - getMVCC().complete(key.getWriteEntry()); - } - return key; + public long getReadPoint() { + return getReadPoint(IsolationLevel.READ_COMMITTED); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index 5fe2061..86a3c3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -127,10 +127,7 @@ class FSWALEntry extends Entry { } } - // This has to stay in this order - WALKey key = getKey(); - key.setLogSeqNum(regionSequenceId); - key.setWriteEntry(we); + getKey().setWriteEntry(we); return regionSequenceId; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java index c094ced..7c40323 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogKey.java @@ -166,7 +166,7 @@ public class HLogKey extends WALKey implements Writable { this.tablename.getName().length, out, compressionContext.tableDict); } - out.writeLong(this.logSeqNum); + out.writeLong(getSequenceId()); out.writeLong(this.writeTime); // Don't need to write the clusters information as we are using protobufs from 0.95 // Writing only the first clusterId for testing the legacy read @@ -213,7 +213,7 @@ public class HLogKey extends WALKey implements Writable { tablenameBytes = Compressor.readCompressed(in, compressionContext.tableDict); } - this.logSeqNum = in.readLong(); + setSequenceId(in.readLong()); this.writeTime = in.readLong(); this.clusterIds.clear(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index c89a466..f268422 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -37,9 +37,9 @@ import org.apache.hadoop.hbase.wal.WALKey; import com.google.protobuf.TextFormat; /** - * Helper methods to ease Region Server integration with the write ahead log. + * Helper methods to ease Region Server integration with the Write Ahead Log (WAL). * Note that methods in this class specifically should not require access to anything - * other than the API found in {@link WAL}. + * other than the API found in {@link WAL}. For internal use only. */ @InterfaceAudience.Private public class WALUtil { @@ -51,86 +51,108 @@ public class WALUtil { /** * Write the marker that a compaction has succeeded and is about to be committed. - * This provides info to the HMaster to allow it to recover the compaction if - * this regionserver dies in the middle (This part is not yet implemented). It also prevents - * the compaction from finishing if this regionserver has already lost its lease on the log. + * This provides info to the HMaster to allow it to recover the compaction if this regionserver + * dies in the middle. It also prevents the compaction from finishing if this regionserver has + * already lost its lease on the log. + * + *

This write is for internal use only. Not for external client consumption. * @param mvcc Used by WAL to get sequence Id for the waledit. */ - public static long writeCompactionMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri, + public static WALKey writeCompactionMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri, final CompactionDescriptor c, MultiVersionConcurrencyControl mvcc) throws IOException { - long trx = writeMarker(wal, htd, hri, WALEdit.createCompaction(hri, c), mvcc, true); + WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createCompaction(hri, c), mvcc); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } - return trx; + return walKey; } /** * Write a flush marker indicating a start / abort or a complete of a region flush + * + *

This write is for internal use only. Not for external client consumption. */ - public static long writeFlushMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri, + public static WALKey writeFlushMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc) throws IOException { - long trx = writeMarker(wal, htd, hri, WALEdit.createFlushWALEdit(hri, f), mvcc, sync); + WALKey walKey = + doFullAppendTransaction(wal, htd, hri, WALEdit.createFlushWALEdit(hri, f), mvcc, sync); if (LOG.isTraceEnabled()) { LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f)); } - return trx; + return walKey; } /** - * Write a region open marker indicating that the region is opened + * Write a region open marker indicating that the region is opened. + * This write is for internal use only. Not for external client consumption. */ - public static long writeRegionEventMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri, + public static WALKey writeRegionEventMarker(WAL wal, HTableDescriptor htd, HRegionInfo hri, final RegionEventDescriptor r, final MultiVersionConcurrencyControl mvcc) throws IOException { - long trx = writeMarker(wal, htd, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc, true); + WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc); if (LOG.isTraceEnabled()) { LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r)); } - return trx; + return walKey; } /** * Write a log marker that a bulk load has succeeded and is about to be committed. - * - * @param wal The log to write into. - * @param htd A description of the table that we are bulk loading into. - * @param hri A description of the region in the table that we are bulk loading into. + * This write is for internal use only. Not for external client consumption. + * @param wal The log to write into. + * @param htd A description of the table that we are bulk loading into. + * @param hri A description of the region in the table that we are bulk loading into. * @param desc A protocol buffers based description of the client's bulk loading request - * @return txid of this transaction or if nothing to do, the last txid + * @return walKey with sequenceid filled out for this bulk load marker * @throws IOException We will throw an IOException if we can not append to the HLog. */ - public static long writeBulkLoadMarkerAndSync(final WAL wal, final HTableDescriptor htd, + public static WALKey writeBulkLoadMarkerAndSync(final WAL wal, final HTableDescriptor htd, final HRegionInfo hri, final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc) throws IOException { - long trx = writeMarker(wal, htd, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc, true); + WALKey walKey = writeMarker(wal, htd, hri, WALEdit.createBulkLoadEvent(hri, desc), mvcc); if (LOG.isTraceEnabled()) { LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc)); } - return trx; + return walKey; } - private static long writeMarker(final WAL wal, final HTableDescriptor htd, final HRegionInfo hri, - final WALEdit edit, final MultiVersionConcurrencyControl mvcc, final boolean sync) + private static WALKey writeMarker(final WAL wal, final HTableDescriptor htd, + final HRegionInfo hri, final WALEdit edit, final MultiVersionConcurrencyControl mvcc) + throws IOException { + // If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT + return doFullAppendTransaction(wal, htd, hri, edit, mvcc, true); + } + + /** + * A 'full' WAL transaction involves starting an mvcc transaction followed by an append, + * an optional sync, and then a call to complete the mvcc transaction. This method does it all. + * Good for case of adding a single edit or marker to the WAL. + * + *

This write is for internal use only. Not for external client consumption. + * @return WALKey that was added to the WAL. + */ + public static WALKey doFullAppendTransaction(final WAL wal, final HTableDescriptor htd, + final HRegionInfo hri, final WALEdit edit, final MultiVersionConcurrencyControl mvcc, + final boolean sync) throws IOException { // TODO: Pass in current time to use? - WALKey key = - new HLogKey(hri.getEncodedNameAsBytes(), hri.getTable(), System.currentTimeMillis(), mvcc); - // Add it to the log but the false specifies that we don't need to add it to the memstore + WALKey walKey = + new WALKey(hri.getEncodedNameAsBytes(), hri.getTable(), System.currentTimeMillis(), mvcc); long trx = MultiVersionConcurrencyControl.NONE; try { - trx = wal.append(htd, hri, key, edit, false); - if (sync) wal.sync(trx); - } finally { - // If you get hung here, is it a real WAL or a mocked WAL? If the latter, you need to - // trip the latch that is inside in getWriteEntry up in your mock. See down in the append - // called from onEvent in FSHLog. - MultiVersionConcurrencyControl.WriteEntry we = key.getWriteEntry(); - if (mvcc != null && we != null) mvcc.complete(we); + trx = wal.append(htd, hri, walKey, edit, false); + if (sync) { + wal.sync(trx); + } + // Call complete only here because these are markers only. They are not for clients to read. + mvcc.complete(walKey.getWriteEntry()); + } catch (IOException ioe) { + mvcc.complete(walKey.getWriteEntry()); + throw ioe; } - return trx; + return walKey; } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java index 4091a82..09096fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java @@ -30,52 +30,50 @@ import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; -import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.SequenceId; +// imports for things that haven't moved from regionserver.wal yet. +import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; +import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; -// imports for things that haven't moved from regionserver.wal yet. -import org.apache.hadoop.hbase.regionserver.wal.CompressionContext; -import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; - /** - * A Key for an entry in the change log. + * A Key for an entry in the WAL. * * The log intermingles edits to many tables and rows, so each log entry * identifies the appropriate table and row. Within a table and row, they're * also sorted. * - *

Some Transactional edits (START, COMMIT, ABORT) will not have an - * associated row. + *

Some Transactional edits (START, COMMIT, ABORT) will not have an associated row. * * Note that protected members marked @InterfaceAudience.Private are only protected * to support the legacy HLogKey class, which is in a different package. - * - *

*/ // TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical // purposes. They need to be merged into WALEntry. -// TODO: Cleanup. We have logSeqNum and then WriteEntry, both are sequence id'ing. Fix. @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION) public class WALKey implements SequenceId, Comparable { private static final Log LOG = LogFactory.getLog(WALKey.class); + private final CountDownLatch sequenceIdAssignedLatch = new CountDownLatch(1); + /** + * Used to represent when a particular wal key doesn't know/care about the sequence ordering. + */ + public static final long NO_SEQUENCE_ID = -1; @InterfaceAudience.Private // For internal use only. public MultiVersionConcurrencyControl getMvcc() { @@ -83,25 +81,22 @@ public class WALKey implements SequenceId, Comparable { } /** - * Will block until a write entry has been assigned by they WAL subsystem. - * @return A WriteEntry gotten from local WAL subsystem. Must be completed by calling - * {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)} - * or - * {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)} + * Use it to complete mvcc transaction. This WALKey was part of + * (the transaction is started when you call append; see the comment on FSHLog#append). To + * complete call + * {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)} + * or {@link MultiVersionConcurrencyControl#complete(MultiVersionConcurrencyControl.WriteEntry)} + * @return A WriteEntry gotten from local WAL subsystem. * @see #setWriteEntry(MultiVersionConcurrencyControl.WriteEntry) */ @InterfaceAudience.Private // For internal use only. public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException { try { - this.seqNumAssignedLatch.await(); + this.sequenceIdAssignedLatch.await(); } catch (InterruptedException ie) { - // If interrupted... clear out our entry else we can block up mvcc. MultiVersionConcurrencyControl mvcc = getMvcc(); - LOG.debug("mvcc=" + mvcc + ", writeEntry=" + this.writeEntry); - if (mvcc != null) { - if (this.writeEntry != null) { - mvcc.complete(this.writeEntry); - } + if (LOG.isDebugEnabled()) { + LOG.debug("mvcc=" + mvcc + ", writeEntry=" + this.writeEntry); } InterruptedIOException iie = new InterruptedIOException(); iie.initCause(ie); @@ -112,11 +107,19 @@ public class WALKey implements SequenceId, Comparable { @InterfaceAudience.Private // For internal use only. public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) { + if (this.writeEntry != null) { + throw new RuntimeException("Non-null!!!"); + } this.writeEntry = writeEntry; - this.seqNumAssignedLatch.countDown(); + // Set our sequenceid now using WriteEntry. + if (this.writeEntry != null) { + this.sequenceId = this.writeEntry.getWriteNumber(); + } + this.sequenceIdAssignedLatch.countDown(); } - // should be < 0 (@see HLogKey#readFields(DataInput)) + // REMOVE!!!! No more Writables!!!! + // Should be < 0 (@see HLogKey#readFields(DataInput)) // version 2 supports WAL compression // public members here are only public because of HLogKey @InterfaceAudience.Private @@ -163,21 +166,23 @@ public class WALKey implements SequenceId, Comparable { @InterfaceAudience.Private protected static final Version VERSION = Version.COMPRESSED; - /** Used to represent when a particular wal key doesn't know/care about the sequence ordering. */ - public static final long NO_SEQUENCE_ID = -1; - - // visible for deprecated HLogKey @InterfaceAudience.Private protected byte [] encodedRegionName; // visible for deprecated HLogKey @InterfaceAudience.Private protected TableName tablename; - // visible for deprecated HLogKey - @InterfaceAudience.Private - protected long logSeqNum; + /** + * SequenceId for this edit. Set post-construction at write-to-WAL time. Until then it is + * NO_SEQUENCE_ID. Change it so multiple threads can read it -- e.g. access is synchronized. + */ + private long sequenceId; + + /** + * Used during WAL replay; the sequenceId of the edit when it came into the system. + */ private long origLogSeqNum = 0; - private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1); + // Time at which this edit was written. // visible for deprecated HLogKey @InterfaceAudience.Private @@ -193,6 +198,9 @@ public class WALKey implements SequenceId, Comparable { private long nonceGroup = HConstants.NO_NONCE; private long nonce = HConstants.NO_NONCE; private MultiVersionConcurrencyControl mvcc; + /** + * Set in a way visible to multiple threads; e.g. synchronized getter/setters. + */ private MultiVersionConcurrencyControl.WriteEntry writeEntry; public static final List EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList()); @@ -215,10 +223,15 @@ public class WALKey implements SequenceId, Comparable { HConstants.NO_NONCE, HConstants.NO_NONCE, null); } + /** + * @deprecated Remove. Useless. + */ + @Deprecated // REMOVE public WALKey(final byte[] encodedRegionName, final TableName tablename) { this(encodedRegionName, tablename, System.currentTimeMillis()); } + // TODO: Fix being able to pass in sequenceid. public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now) { init(encodedRegionName, tablename, @@ -257,6 +270,7 @@ public class WALKey implements SequenceId, Comparable { * @param now Time at which this edit was written. * @param clusterIds the clusters that have consumed the change(used in Replication) */ + // TODO: Fix being able to pass in sequenceid. public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum, @@ -300,6 +314,7 @@ public class WALKey implements SequenceId, Comparable { * @param nonceGroup * @param nonce */ + // TODO: Fix being able to pass in sequenceid. public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum, @@ -325,7 +340,7 @@ public class WALKey implements SequenceId, Comparable { long nonceGroup, long nonce, MultiVersionConcurrencyControl mvcc) { - this.logSeqNum = logSeqNum; + this.sequenceId = logSeqNum; this.writeTime = now; this.clusterIds = clusterIds; this.encodedRegionName = encodedRegionName; @@ -333,6 +348,15 @@ public class WALKey implements SequenceId, Comparable { this.nonceGroup = nonceGroup; this.nonce = nonce; this.mvcc = mvcc; + if (logSeqNum != NO_SEQUENCE_ID) { + setSequenceId(logSeqNum); + } + } + + // For HLogKey and deserialization. DO NOT USE. See setWriteEntry below. + @InterfaceAudience.Private + protected void setSequenceId(long sequenceId) { + this.sequenceId = sequenceId; } /** @@ -352,32 +376,24 @@ public class WALKey implements SequenceId, Comparable { return tablename; } - /** @return log sequence number */ - public long getLogSeqNum() { - return this.logSeqNum; - } - - /** - * Allow that the log sequence id to be set post-construction - * Only public for org.apache.hadoop.hbase.regionserver.wal.FSWALEntry - * @param sequence + /** @return log sequence number + * @deprecated Use {@link #getSequenceId()} */ - @InterfaceAudience.Private - public void setLogSeqNum(final long sequence) { - this.logSeqNum = sequence; - + @Deprecated + public long getLogSeqNum() { + return getSequenceId(); } /** - * Used to set original seq Id for WALKey during wal replay - * @param seqId + * Used to set original sequenceId for WALKey during WAL replay */ - public void setOrigLogSeqNum(final long seqId) { - this.origLogSeqNum = seqId; + public void setOrigLogSeqNum(final long sequenceId) { + this.origLogSeqNum = sequenceId; } /** - * Return a positive long if current WALKey is created from a replay edit + * Return a positive long if current WALKey is created from a replay edit; a replay edit is an + * edit that came in when replaying WALs of a crashed server. * @return original sequence number of the WALEdit */ public long getOrigLogSeqNum() { @@ -385,43 +401,14 @@ public class WALKey implements SequenceId, Comparable { } /** - * Wait for sequence number to be assigned & return the assigned value + * SequenceId is only available post WAL-assign. Calls before this will get you a + * {@link #NO_SEQUENCE_ID}. See the comment on FSHLog#append and #getWriteNumber in this method + * for more on when this sequenceId comes available. * @return long the new assigned sequence number - * @throws IOException */ @Override - public long getSequenceId() throws IOException { - return getSequenceId(-1); - } - - /** - * Wait for sequence number to be assigned & return the assigned value. - * @param maxWaitForSeqId maximum time to wait in milliseconds for sequenceid - * @return long the new assigned sequence number - * @throws IOException - */ - public long getSequenceId(final long maxWaitForSeqId) throws IOException { - // TODO: This implementation waiting on a latch is problematic because if a higher level - // determines we should stop or abort, there is no global list of all these blocked WALKeys - // waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId. - // - // UPDATE: I think we can remove the timeout now we are stamping all walkeys with sequenceid, - // even those that have failed (previously we were not... so they would just hang out...). - // St.Ack 20150910 - try { - if (maxWaitForSeqId < 0) { - this.seqNumAssignedLatch.await(); - } else if (!this.seqNumAssignedLatch.await(maxWaitForSeqId, TimeUnit.MILLISECONDS)) { - throw new TimeoutIOException("Failed to get sequenceid after " + maxWaitForSeqId + - "ms; WAL system stuck or has gone away?"); - } - } catch (InterruptedException ie) { - LOG.warn("Thread interrupted waiting for next log sequence number"); - InterruptedIOException iie = new InterruptedIOException(); - iie.initCause(ie); - throw iie; - } - return this.logSeqNum; + public long getSequenceId() { + return this.sequenceId; } /** @@ -495,7 +482,7 @@ public class WALKey implements SequenceId, Comparable { @Override public String toString() { return tablename + "/" + Bytes.toString(encodedRegionName) + "/" + - logSeqNum; + sequenceId; } /** @@ -509,7 +496,7 @@ public class WALKey implements SequenceId, Comparable { Map stringMap = new HashMap(); stringMap.put("table", tablename); stringMap.put("region", Bytes.toStringBinary(encodedRegionName)); - stringMap.put("sequence", logSeqNum); + stringMap.put("sequence", getSequenceId()); return stringMap; } @@ -527,7 +514,7 @@ public class WALKey implements SequenceId, Comparable { @Override public int hashCode() { int result = Bytes.hashCode(this.encodedRegionName); - result ^= this.logSeqNum; + result ^= getSequenceId(); result ^= this.writeTime; return result; } @@ -536,9 +523,11 @@ public class WALKey implements SequenceId, Comparable { public int compareTo(WALKey o) { int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName); if (result == 0) { - if (this.logSeqNum < o.logSeqNum) { + long sid = getSequenceId(); + long otherSid = o.getSequenceId(); + if (sid < otherSid) { result = -1; - } else if (this.logSeqNum > o.logSeqNum) { + } else if (sid > otherSid) { result = 1; } if (result == 0) { @@ -592,7 +581,7 @@ public class WALKey implements SequenceId, Comparable { builder.setTableName(compressor.compress(this.tablename.getName(), compressionContext.tableDict)); } - builder.setLogSequenceNumber(this.logSeqNum); + builder.setLogSequenceNumber(getSequenceId()); builder.setWriteTime(writeTime); if (this.origLogSeqNum > 0) { builder.setOrigSequenceNumber(this.origLogSeqNum); @@ -658,10 +647,10 @@ public class WALKey implements SequenceId, Comparable { this.scopes.put(family, scope.getScopeType().getNumber()); } } - this.logSeqNum = walKey.getLogSequenceNumber(); + setSequenceId(walKey.getLogSequenceNumber()); this.writeTime = walKey.getWriteTime(); if(walKey.hasOrigSequenceNumber()) { this.origLogSeqNum = walKey.getOrigSequenceNumber(); } } -} +} \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 60bc155..7add8a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -4765,12 +4765,12 @@ public class TestHRegion { durabilityTest(method, Durability.ASYNC_WAL, Durability.USE_DEFAULT, 5000, true, false, true); // expect skip wal cases - durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, true, false, false); - durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, true, false, false); - durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, true, false, false); - durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, true, false, false); - durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, true, false, false); - durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, true, false, false); + durabilityTest(method, Durability.SYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); + durabilityTest(method, Durability.FSYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); + durabilityTest(method, Durability.ASYNC_WAL, Durability.SKIP_WAL, 0, false, false, false); + durabilityTest(method, Durability.SKIP_WAL, Durability.SKIP_WAL, 0, false, false, false); + durabilityTest(method, Durability.USE_DEFAULT, Durability.SKIP_WAL, 0, false, false, false); + durabilityTest(method, Durability.SKIP_WAL, Durability.USE_DEFAULT, 0, false, false, false); }