diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index f667cb6..b93fd60 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -737,7 +737,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength()); } - + /** * Create an empty byte[] representing a KeyValue * All lengths are preset and can be filled in later. @@ -2053,10 +2053,38 @@ public class KeyValue implements Cell, HeapSize, Cloneable { return compare; } + // Negate following comparisons so later edits show up first + + // compare log replay tag value if there is any + // when either keyvalue tagged with log replay sequence number, we need to compare them: + // 1) when both keyvalues have the tag, then use the tag values for comparison + // 2) when one has and the other doesn't have, the one without the log replay tag wins because + // it means the edit isn't from recovery but new one coming from clients during recovery + // 3) when both doesn't have, then skip to the next mvcc comparison + long leftChangeSeqNum = getReplaySeqNum(left); + long RightChangeSeqNum = getReplaySeqNum(right); + if (leftChangeSeqNum != Long.MAX_VALUE || RightChangeSeqNum != Long.MAX_VALUE) { + return Longs.compare(RightChangeSeqNum, leftChangeSeqNum); + } + // compare Mvcc Version - // Negate this comparison so later edits show up first return Longs.compare(right.getMvccVersion(), left.getMvccVersion()); } + + /** + * Return replay log sequence number for the cell + * @param c + * @return Long.MAX_VALUE if there is no LOG_REPLAY_TAG + */ + private long getReplaySeqNum(final Cell c) { + Tag tag = Tag.getTag(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength(), + TagType.LOG_REPLAY_TAG_TYPE); + + if(tag != null) { + return Bytes.toLong(tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()); + } + return Long.MAX_VALUE; + } public int compareTimestamps(final KeyValue left, final KeyValue right) { // Compare timestamps @@ -2735,6 +2763,27 @@ public class KeyValue implements Cell, HeapSize, Cloneable { in.readFully(bytes); return new KeyValue(bytes, 0, length); } + + /** + * Create a new KeyValue by copying existing cell and adding new tags + * @param c + * @param newTags + * @return a new KeyValue instance with new tags + */ + public static KeyValue cloneAndAddTags(Cell c, List newTags) { + List existingTags = null; + if(c.getTagsLength() > 0) { + existingTags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength()); + existingTags.addAll(newTags); + } else { + existingTags = newTags; + } + return new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(), + c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), + c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), + c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), + c.getValueLength(), existingTags); + } /** * Create a KeyValue reading from the raw InputStream. diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java index ad0c595..225d6c2 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java @@ -155,6 +155,26 @@ public class Tag { } return tags; } + + /** + * Retrieve the first tag from the tags byte array matching the passed in tag type + * @param b + * @param offset + * @param length + * @param type + * @return null if there is no tag of the passed in tag type + */ + public static Tag getTag(byte[] b, int offset, int length, byte type) { + int pos = offset; + while (pos < offset + length) { + short tagLen = Bytes.toShort(b, pos); + if(b[pos + TAG_LENGTH_SIZE] == type) { + return new Tag(b, pos, (short) (tagLen + TAG_LENGTH_SIZE)); + } + pos += TAG_LENGTH_SIZE + tagLen; + } + return null; + } /** * Returns the total length of the entire tag entity diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java new file mode 100644 index 0000000..6c43c78 --- /dev/null +++ hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class TagType { + // Please declare new Tag Types here to avoid step on pre-existing tag types. + public static final byte ACL_TAG_TYPE = (byte) 1; + public static final byte VISIBILITY_TAG_TYPE = (byte) 2; + public static final byte LOG_REPLAY_TAG_TYPE = (byte) 3; +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 14c2a5c..7369cc3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -856,6 +856,15 @@ MasterServices, Server { // may also host user regions } Set previouslyFailedMetaRSs = getPreviouselyFailedMetaServersFromZK(); + // need to use union of previouslyFailedMetaRSs recorded in ZK and previouslyFailedServers + // instead of previouslyFailedMetaRSs alone to address the following two situations: + // 1) the chained failure situation(recovery failed multiple times in a row). + // 2) master get killed right before it could delete the recovering hbase:meta from ZK while the + // same server still has non-meta wals to be replayed so that + // removeStaleRecoveringRegionsFromZK can't delete the stale hbase:meta region + // Passing more servers into splitMetaLog is all right. If a server doesn't have hbase:meta wal, + // there is no op for the server. + previouslyFailedMetaRSs.addAll(previouslyFailedServers); this.initializationBeforeMetaAssignment = true; @@ -866,26 +875,11 @@ MasterServices, Server { // Make sure meta assigned before proceeding. status.setStatus("Assigning Meta Region"); - assignMeta(status); + assignMeta(status, previouslyFailedMetaRSs); // check if master is shutting down because above assignMeta could return even hbase:meta isn't // assigned when master is shutting down if(this.stopped) return; - if (this.distributedLogReplay && (!previouslyFailedMetaRSs.isEmpty())) { - // replay WAL edits mode need new hbase:meta RS is assigned firstly - status.setStatus("replaying log for Meta Region"); - // need to use union of previouslyFailedMetaRSs recorded in ZK and previouslyFailedServers - // instead of oldMetaServerLocation to address the following two situations: - // 1) the chained failure situation(recovery failed multiple times in a row). - // 2) master get killed right before it could delete the recovering hbase:meta from ZK while the - // same server still has non-meta wals to be replayed so that - // removeStaleRecoveringRegionsFromZK can't delete the stale hbase:meta region - // Passing more servers into splitMetaLog is all right. If a server doesn't have hbase:meta wal, - // there is no op for the server. - previouslyFailedMetaRSs.addAll(previouslyFailedServers); - this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs); - } - status.setStatus("Submitting log splitting work for previously failed region servers"); // Master has recovered hbase:meta region server and we put // other failed region servers in a queue to be handled later by SSH @@ -982,17 +976,17 @@ MasterServices, Server { /** * Check hbase:meta is assigned. If not, assign it. * @param status MonitoredTask + * @param previouslyFailedMetaRSs * @throws InterruptedException * @throws IOException * @throws KeeperException */ - void assignMeta(MonitoredTask status) + void assignMeta(MonitoredTask status, Set previouslyFailedMetaRSs) throws InterruptedException, IOException, KeeperException { // Work on meta region int assigned = 0; long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000); status.setStatus("Assigning hbase:meta region"); - ServerName logReplayFailedMetaServer = null; RegionStates regionStates = assignmentManager.getRegionStates(); regionStates.createRegionState(HRegionInfo.FIRST_META_REGIONINFO); @@ -1010,12 +1004,10 @@ MasterServices, Server { LOG.info("Forcing expire of " + currentMetaServer); serverManager.expireServer(currentMetaServer); splitMetaLogBeforeAssignment(currentMetaServer); - if (this.distributedLogReplay) { - logReplayFailedMetaServer = currentMetaServer; - } + previouslyFailedMetaRSs.add(currentMetaServer); } - // Make sure assignment manager knows where the meta is, - // so that meta sever shutdown handler kicks in. + // Make sure following meta assignment happens + assignmentManager.getRegionStates().clearLastAssignment(HRegionInfo.FIRST_META_REGIONINFO); assignmentManager.assignMeta(); } } else { @@ -1028,19 +1020,18 @@ MasterServices, Server { enableMeta(TableName.META_TABLE_NAME); + if (this.distributedLogReplay && (!previouslyFailedMetaRSs.isEmpty())) { + // replay WAL edits mode need new hbase:meta RS is assigned firstly + status.setStatus("replaying log for Meta Region"); + this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs); + } + // Make sure a hbase:meta location is set. We need to enable SSH here since // if the meta region server is died at this time, we need it to be re-assigned // by SSH so that system tables can be assigned. // No need to wait for meta is assigned = 0 when meta is just verified. enableServerShutdownHandler(assigned != 0); - // logReplayFailedMetaServer is set only if log replay is - // enabled and the current meta server is expired - if (logReplayFailedMetaServer != null) { - // In Replay WAL Mode, we need the new hbase:meta server online - this.fileSystemManager.splitMetaLog(logReplayFailedMetaServer); - } - LOG.info("hbase:meta assigned=" + assigned + ", rit=" + rit + ", location=" + catalogTracker.getMetaLocation()); status.setStatus("META assigned."); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index ebab4b8..f18e08a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -120,8 +121,7 @@ public class MasterFileSystem { FSUtils.setFsDefault(conf, new Path(this.fs.getUri())); // make sure the fs has the same conf fs.setConf(conf); - this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, - HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); + this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf); // setup the filesystem variable // set up the archived logs path this.oldLogDir = createInitialFileSystemLayout(); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java index bd3aec7..7537115 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java @@ -426,6 +426,10 @@ public class RegionStates { * Log split is done for a given region, so it is assignable now. */ public synchronized void logSplit(final HRegionInfo region) { + clearLastAssignment(region); + } + + public synchronized void clearLastAssignment(final HRegionInfo region) { lastAssignments.remove(region.getEncodedName()); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 044911d..a783748 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -222,8 +222,7 @@ public class SplitLogManager extends ZooKeeperListener { this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT); this.unassignedTimeout = conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); - this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, - HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); + this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf); LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout + ", distributedLogReplay=" + this.distributedLogReplay); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index 34a75e6..8a75dac 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter; import org.apache.hadoop.hbase.zookeeper.ZKAssign; import org.apache.zookeeper.KeeperException; @@ -84,9 +85,7 @@ public class ServerShutdownHandler extends EventHandler { LOG.warn(this.serverName + " is NOT in deadservers; it should be!"); } this.shouldSplitHlog = shouldSplitHlog; - this.distributedLogReplay = server.getConfiguration().getBoolean( - HConstants.DISTRIBUTED_LOG_REPLAY_KEY, - HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); + this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(server.getConfiguration()); this.regionAssignmentWaitTimeout = server.getConfiguration().getInt( HConstants.LOG_REPLAY_WAIT_REGION_TIMEOUT, 15000); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e7b5a2c..e280020 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -195,7 +195,7 @@ public class HRegion implements HeapSize { // , Writable{ public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = "hbase.hregion.scan.loadColumnFamiliesOnDemand"; - + /** * This is the global default value for durability. All tables/mutations not * defining a durability or using USE_DEFAULT will default to this value. @@ -210,7 +210,7 @@ public class HRegion implements HeapSize { // , Writable{ */ final AtomicBoolean closing = new AtomicBoolean(false); - protected long completeSequenceId = -1L; + protected volatile long completeSequenceId = -1L; /** * Region level sequence Id. It is used for appending WALEdits in HLog. Its default value is -1, @@ -399,6 +399,8 @@ public class HRegion implements HeapSize { // , Writable{ private RegionServerAccounting rsAccounting; private List> recentFlushes = new ArrayList>(); private long flushCheckInterval; + // flushPerChanges is to prevent too many changes in memstore + private long flushPerChanges; private long blockingMemStoreSize; final long threadWakeFrequency; // Used to guard closes @@ -493,6 +495,12 @@ public class HRegion implements HeapSize { // , Writable{ .addWritableMap(htd.getValues()); this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL, DEFAULT_CACHE_FLUSH_INTERVAL); + this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES); + if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) { + throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed " + + MAX_FLUSH_PER_CHANGES); + } + this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); @@ -651,6 +659,12 @@ public class HRegion implements HeapSize { // , Writable{ // Use maximum of log sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). long nextSeqid = maxSeqId + 1; + if (this.isRecovering) { + // In distributedLogReplay mode, we don't know the last change sequence number because region + // is opened before recovery completes. So we add a safety bumper to avoid new sequence number + // overlaps used sequence numbers + nextSeqid += this.flushPerChanges + 10000000; // add another extra 10million + } LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() + "; next sequenceid=" + nextSeqid); @@ -658,6 +672,7 @@ public class HRegion implements HeapSize { // , Writable{ this.closing.set(false); this.closed.set(false); + this.completeSequenceId = nextSeqid; if (coprocessorHost != null) { status.setStatus("Running coprocessor post-open hooks"); coprocessorHost.postOpen(); @@ -952,6 +967,16 @@ public class HRegion implements HeapSize { // , Writable{ /** Default interval for the memstore flush */ public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000; + /** Conf key to force a flush if there are already enough changes for one region in memstore */ + public static final String MEMSTORE_FLUSH_PER_CHANGES = + "hbase.regionserver.flush.per.changes"; + public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000; // 30 millions + /** + * The following MAX_FLUSH_PER_CHANGES is large enough because each KeyValue has 20+ bytes + * overhead. Therefore, even 1G empty KVs occupy at least 20GB memstore size for a single region + */ + public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G + /** * Close down this HRegion. Flush the cache unless abort parameter is true, * Shut down each HStore, don't service any more calls. @@ -1452,6 +1477,9 @@ public class HRegion implements HeapSize { // , Writable{ * Should the memstore be flushed now */ boolean shouldFlush() { + if(this.completeSequenceId + this.flushPerChanges < this.sequenceId.get()) { + return true; + } if (flushCheckInterval <= 0) { //disabled return false; } @@ -1662,9 +1690,7 @@ public class HRegion implements HeapSize { // , Writable{ this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); // Update the last flushed sequence id for region - if (this.rsServices != null) { - completeSequenceId = flushSeqId; - } + completeSequenceId = flushSeqId; // C. Finally notify anyone waiting on memstore to clear: // e.g. checkResources(). @@ -5237,7 +5263,7 @@ public class HRegion implements HeapSize { // , Writable{ ClassSize.OBJECT + ClassSize.ARRAY + 41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + - (11 * Bytes.SIZEOF_LONG) + + (12 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); // woefully out of date - currently missing: diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 951ef6d..119d3e0 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -104,6 +104,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.PriorityFunction; @@ -623,13 +624,11 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa }; this.rsHost = new RegionServerCoprocessorHost(this, this.conf); - this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, - HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); - this.rsInfo = RegionServerInfo.newBuilder(); // Put up the webui. Webui may come up on port other than configured if // that port is occupied. Adjust serverInfo if this is the case. this.rsInfo.setInfoPort(putUpWebUI()); + this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf); } /** @@ -3925,6 +3924,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); List> walEntries = new ArrayList>(); List mutations = new ArrayList(); + // when tag is enabled, we need tag replay edits with log sequence number + boolean needAddReplayTag = (HFile.getFormatVersion(this.conf) >= 3); for (WALEntry entry : entries) { if (nonceManager != null) { long nonceGroup = entry.getKey().hasNonceGroup() @@ -3934,8 +3935,8 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } Pair walEntry = (coprocessorHost == null) ? null : new Pair(); - List edits = - HLogSplitter.getMutationsFromWALEntry(entry, cells, walEntry); + List edits = HLogSplitter.getMutationsFromWALEntry(entry, + cells, walEntry, needAddReplayTag); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a // KeyValue. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 5ae8642..4a83741 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -174,9 +174,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable { try { LOG.info("SplitLogWorker " + this.serverName + " starting"); this.watcher.registerListener(this); - boolean distributedLogReplay = this.conf.getBoolean( - HConstants.DISTRIBUTED_LOG_REPLAY_KEY, - HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); + boolean distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf); if (distributedLogReplay) { // initialize a new connection for splitlogworker configuration HConnectionManager.getConnection(conf); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index ddd928c..bcbf6ae 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -65,6 +65,9 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HConnection; @@ -73,6 +76,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -163,6 +167,8 @@ public class HLogSplitter { // Min batch size when replay WAL edits private final int minBatchSize; + private static boolean bTagSupportEnabled = false; + HLogSplitter(Configuration conf, Path rootDir, FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) { this.conf = conf; @@ -178,8 +184,7 @@ public class HLogSplitter { // a larger minBatchSize may slow down recovery because replay writer has to wait for // enough edits before replaying them this.minBatchSize = conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64); - this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, - HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG); + this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf); this.numWriterThreads = conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); if (zkw != null && this.distributedLogReplay) { @@ -191,6 +196,7 @@ public class HLogSplitter { this.distributedLogReplay = false; outputSink = new LogRecoveredEditsOutputSink(numWriterThreads); } + } /** @@ -1483,6 +1489,7 @@ public class HLogSplitter { if (!skippedKVs.isEmpty()) { kvs.removeAll(skippedKVs); } + synchronized (serverToBufferQueueMap) { locKey = loc.getHostnamePort() + KEY_DELIMITER + table; List> queue = serverToBufferQueueMap.get(locKey); @@ -1851,6 +1858,33 @@ public class HLogSplitter { public final long nonce; } + /** + * Tag original sequence number for each edit to be replayed + * @param entry + * @param cell + * @return + */ + private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) { + // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet + boolean needAddRecoveryTag = true; + if (cell.getTagsLength() > 0) { + Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(), + TagType.LOG_REPLAY_TAG_TYPE); + if (tmpTag != null) { + // found an existing log replay tag so reuse it + needAddRecoveryTag = false; + } + } + if (needAddRecoveryTag) { + List newTags = new ArrayList(); + Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE, Bytes.toBytes(entry.getKey() + .getLogSequenceNumber())); + newTags.add(replayTag); + return KeyValue.cloneAndAddTags(cell, newTags); + } + return cell; + } + /** * This function is used to construct mutations from a WALEntry. It also reconstructs HLogKey & * WALEdit from the passed in WALEntry @@ -1858,11 +1892,12 @@ public class HLogSplitter { * @param cells * @param logEntry pair of HLogKey and WALEdit instance stores HLogKey and WALEdit instances * extracted from the passed in WALEntry. + * @param addLogReplayTag * @return list of Pair to be replayed * @throws IOException */ - public static List getMutationsFromWALEntry(WALEntry entry, - CellScanner cells, Pair logEntry) throws IOException { + public static List getMutationsFromWALEntry(WALEntry entry, CellScanner cells, + Pair logEntry, boolean addLogReplayTag) throws IOException { if (entry == null) { // return an empty array @@ -1907,7 +1942,11 @@ public class HLogSplitter { if (CellUtil.isDelete(cell)) { ((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell)); } else { - ((Put) m).add(KeyValueUtil.ensureKeyValue(cell)); + Cell tmpNewCell = cell; + if (addLogReplayTag) { + tmpNewCell = tagReplayLogSequenceNumber(entry, cell); + } + ((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell)); } previousCell = cell; } @@ -1928,4 +1967,15 @@ public class HLogSplitter { return mutations; } + + /** + * Returns if distributed log replay is turned on or not + * @param conf + * @return true when distributed log replay is turned on + */ + public static boolean isDistributedLogReplay(Configuration conf) { + // since hbase0.98, distributed log replay is turned on by default when HFile V3 is used. + return conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, + (HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG || HFile.getFormatVersion(conf) >= 3)); + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java index 1a3d703..af35c8d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessControlLists.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -103,7 +104,7 @@ public class AccessControlLists { public static final String ACL_LIST_FAMILY_STR = "l"; public static final byte[] ACL_LIST_FAMILY = Bytes.toBytes(ACL_LIST_FAMILY_STR); /** KV tag to store per cell access control lists */ - public static final byte ACL_TAG_TYPE = (byte) 1; + public static final byte ACL_TAG_TYPE = TagType.ACL_TAG_TYPE; public static final char NAMESPACE_PREFIX = '@'; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java index e11b972..a354ebb 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java @@ -23,6 +23,7 @@ import java.util.Map.Entry; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.MultiUserAuthorizations; @@ -43,7 +44,7 @@ public class VisibilityUtils { public static final String VISIBILITY_LABEL_GENERATOR_CLASS = "hbase.regionserver.scan.visibility.label.generator.class"; - public static final byte VISIBILITY_TAG_TYPE = (byte) 2; + public static final byte VISIBILITY_TAG_TYPE = TagType.VISIBILITY_TAG_TYPE; public static final String SYSTEM_LABEL = "system"; /** diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 99a8f02..be16e30 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -962,11 +962,6 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { hbaseAdmin = null; } - if (zooKeeperWatcher != null) { - zooKeeperWatcher.close(); - zooKeeperWatcher = null; - } - // unset the configuration for MIN and MAX RS to start conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1); @@ -976,6 +971,11 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { this.hbaseCluster.waitUntilShutDown(); this.hbaseCluster = null; } + + if (zooKeeperWatcher != null) { + zooKeeperWatcher.close(); + zooKeeperWatcher = null; + } } /** diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 2b4e4f4..e81da00 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -70,17 +70,20 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.NonceGenerator; import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.exceptions.OperationConflictException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -1159,6 +1162,198 @@ public class TestDistributedLogSplitting { zkw.close(); } + @Test(timeout = 300000) + public void testSameVersionUpdatesRecovery() throws Exception { + LOG.info("testSameVersionUpdatesRecovery"); + conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + conf.setInt("hfile.format.version", 3); + startCluster(NUM_RS); + final AtomicLong sequenceId = new AtomicLong(100); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingMeta = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs); + for (HRegionInfo region : regions) { + if (region.isMetaRegion()) { + isCarryingMeta = true; + break; + } + } + if (isCarryingMeta) { + continue; + } + break; + } + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable() + || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { + it.remove(); + } + } + if (regions.size() == 0) return; + HRegionInfo curRegionInfo = regions.get(0); + byte[] startRow = curRegionInfo.getStartKey(); + if (startRow == null || startRow.length == 0) { + startRow = new byte[] { 0, 0, 0, 0, 1 }; + } + byte[] row = Bytes.incrementBytes(startRow, 1); + // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key + row = Arrays.copyOfRange(row, 3, 8); + long value = 0; + byte[] tableName = Bytes.toBytes("table"); + byte[] family = Bytes.toBytes("family"); + byte[] qualifier = Bytes.toBytes("c1"); + long timeStamp = System.currentTimeMillis(); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); + htd.addFamily(new HColumnDescriptor(family)); + for (int i = 0; i < NUM_LOG_LINES; i += 1) { + WALEdit e = new WALEdit(); + value++; + e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); + hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e, + System.currentTimeMillis(), htd, sequenceId); + } + hrs.getWAL().sync(); + hrs.getWAL().close(); + + // wait for abort completes + this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); + + // verify we got the last value + LOG.info("Verification Starts..."); + Get g = new Get(row); + Result r = ht.get(g); + long theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + + // after flush + LOG.info("Verification after flush..."); + TEST_UTIL.getHBaseAdmin().flush(tableName); + r = ht.get(g); + theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + ht.close(); + } + + @Test(timeout = 300000) + public void testSameVersionUpdatesRecoveryWithCompaction() throws Exception { + LOG.info("testSameVersionUpdatesRecoveryWithWrites"); + conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024); + conf.setInt("hbase.hstore.compactionThreshold", 3); + conf.setInt("hfile.format.version", 3); + startCluster(NUM_RS); + final AtomicLong sequenceId = new AtomicLong(100); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingMeta = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs); + for (HRegionInfo region : regions) { + if (region.isMetaRegion()) { + isCarryingMeta = true; + break; + } + } + if (isCarryingMeta) { + continue; + } + break; + } + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable() + || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { + it.remove(); + } + } + if (regions.size() == 0) return; + HRegionInfo curRegionInfo = regions.get(0); + byte[] startRow = curRegionInfo.getStartKey(); + if (startRow == null || startRow.length == 0) { + startRow = new byte[] { 0, 0, 0, 0, 1 }; + } + byte[] row = Bytes.incrementBytes(startRow, 1); + // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key + row = Arrays.copyOfRange(row, 3, 8); + long value = 0; + final byte[] tableName = Bytes.toBytes("table"); + byte[] family = Bytes.toBytes("family"); + byte[] qualifier = Bytes.toBytes("c1"); + long timeStamp = System.currentTimeMillis(); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); + htd.addFamily(new HColumnDescriptor(family)); + for (int i = 0; i < NUM_LOG_LINES; i += 1) { + WALEdit e = new WALEdit(); + value++; + e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); + hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e, + System.currentTimeMillis(), htd, sequenceId); + } + hrs.getWAL().sync(); + hrs.getWAL().close(); + + // wait for abort completes + this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); + + // verify we got the last value + LOG.info("Verification Starts..."); + Get g = new Get(row); + Result r = ht.get(g); + long theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + + // after flush & compaction + LOG.info("Verification after flush..."); + TEST_UTIL.getHBaseAdmin().flush(tableName); + TEST_UTIL.getHBaseAdmin().compact(tableName); + + // wait for compaction completes + TEST_UTIL.waitFor(30000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (TEST_UTIL.getHBaseAdmin().getCompactionState(tableName) == CompactionState.NONE); + } + }); + + r = ht.get(g); + theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + ht.close(); + } + HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception { return installTable(zkw, tname, fname, nrs, 0); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java index 69c42fe..9275313 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java @@ -27,6 +27,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; @@ -363,7 +364,7 @@ public class TestMasterNoCluster { HMaster master = new HMaster(conf) { @Override - void assignMeta(MonitoredTask status) { + void assignMeta(MonitoredTask status, Set previouslyFailedMeatRSs) { } @Override