diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index f667cb6..6cb9f80 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -737,7 +737,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable { c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), c.getValueLength(), c.getTagsArray(), c.getTagsOffset(), c.getTagsLength()); } - + /** * Create an empty byte[] representing a KeyValue * All lengths are preset and can be filled in later. @@ -2055,7 +2055,30 @@ public class KeyValue implements Cell, HeapSize, Cloneable { // compare Mvcc Version // Negate this comparison so later edits show up first - return Longs.compare(right.getMvccVersion(), left.getMvccVersion()); + long leftChangeSeqNum = getReplaySeqNum(left); + if (leftChangeSeqNum < 0) { + leftChangeSeqNum = left.getMvccVersion(); + } + long RightChangeSeqNum = getReplaySeqNum(right); + if (RightChangeSeqNum < 0) { + RightChangeSeqNum = right.getMvccVersion(); + } + return Longs.compare(RightChangeSeqNum, leftChangeSeqNum); + } + + /** + * Return replay log sequence number for the cell + * @param c + * @return -1 if there is no LOG_REPLAY_TAG exists + */ + private long getReplaySeqNum(final Cell c) { + Tag tag = Tag.getTag(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength(), + TagType.LOG_REPLAY_TAG_TYPE.value()); + + if(tag != null) { + return Bytes.toLong(tag.getBuffer(), tag.getTagOffset(), tag.getTagLength()); + } + return -1; } public int compareTimestamps(final KeyValue left, final KeyValue right) { @@ -2735,6 +2758,27 @@ public class KeyValue implements Cell, HeapSize, Cloneable { in.readFully(bytes); return new KeyValue(bytes, 0, length); } + + /** + * Create a new KeyValue by coping existing cell and adding new tags + * @param c + * @param newTags + * @return + */ + public static KeyValue addTags(Cell c, List newTags) { + List existingTags = null; + if(c.getTagsLength() > 0) { + existingTags = Tag.asList(c.getTagsArray(), c.getTagsOffset(), c.getTagsLength()); + existingTags.addAll(newTags); + } else { + existingTags = newTags; + } + return new KeyValue(c.getRowArray(), c.getRowOffset(), (int)c.getRowLength(), + c.getFamilyArray(), c.getFamilyOffset(), (int)c.getFamilyLength(), + c.getQualifierArray(), c.getQualifierOffset(), (int) c.getQualifierLength(), + c.getTimestamp(), Type.codeToType(c.getTypeByte()), c.getValueArray(), c.getValueOffset(), + c.getValueLength(), existingTags); + } /** * Create a KeyValue reading from the raw InputStream. diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java index ad0c595..aa47574 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/Tag.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase; import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang.mutable.MutableInt; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hbase.util.Bytes; @@ -155,6 +156,26 @@ public class Tag { } return tags; } + + /** + * Retrieve the first tag from the tags byte array matching the passed in tag type + * @param b + * @param offset + * @param length + * @param type + * @return null if there is no tag of the passed in tag type + */ + public static Tag getTag(byte[] b, int offset, int length, byte type) { + int pos = offset; + while (pos < offset + length) { + short tagLen = Bytes.toShort(b, pos); + if(b[pos + TAG_LENGTH_SIZE] == type) { + return new Tag(b, pos, (short) (tagLen + TAG_LENGTH_SIZE)); + } + pos += TAG_LENGTH_SIZE + tagLen; + } + return null; + } /** * Returns the total length of the entire tag entity diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java new file mode 100644 index 0000000..0827a1d --- /dev/null +++ hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public enum TagType { + // Please declare new Tag Types here to avoid step on pre-existing tag types. + VISIBILITY_TAG_TYPE((byte)2), + LOG_REPLAY_TAG_TYPE((byte)3); + + private byte value; + private TagType(byte inputValue) { + this.value = inputValue; + } + public byte value() { + return value; + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d770555..3ea033d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -195,7 +195,7 @@ public class HRegion implements HeapSize { // , Writable{ public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY = "hbase.hregion.scan.loadColumnFamiliesOnDemand"; - + /** * This is the global default value for durability. All tables/mutations not * defining a durability or using USE_DEFAULT will default to this value. @@ -210,7 +210,7 @@ public class HRegion implements HeapSize { // , Writable{ */ final AtomicBoolean closing = new AtomicBoolean(false); - protected long completeSequenceId = -1L; + protected volatile long completeSequenceId = -1L; /** * Region level sequence Id. It is used for appending WALEdits in HLog. Its default value is -1, @@ -399,6 +399,8 @@ public class HRegion implements HeapSize { // , Writable{ private RegionServerAccounting rsAccounting; private List> recentFlushes = new ArrayList>(); private long flushCheckInterval; + // flushPerChanges is to prevent too many changes in memstore + private long flushPerChanges; private long blockingMemStoreSize; final long threadWakeFrequency; // Used to guard closes @@ -493,6 +495,12 @@ public class HRegion implements HeapSize { // , Writable{ .addWritableMap(htd.getValues()); this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL, DEFAULT_CACHE_FLUSH_INTERVAL); + this.flushPerChanges = conf.getLong(MEMSTORE_FLUSH_PER_CHANGES, DEFAULT_FLUSH_PER_CHANGES); + if (this.flushPerChanges > MAX_FLUSH_PER_CHANGES) { + throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed " + + MAX_FLUSH_PER_CHANGES); + } + this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); @@ -651,6 +659,12 @@ public class HRegion implements HeapSize { // , Writable{ // Use maximum of log sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). long nextSeqid = maxSeqId + 1; + if (this.isRecovering) { + // In distributedLogReplay mode, we don't know the last change sequence number because region + // is opened before recovery completes. So we add a safety bumper to avoid new sequence number + // overlaps used sequence numbers + nextSeqid += this.flushPerChanges + 10000000; // add another extra 10million + } LOG.info("Onlined " + this.getRegionInfo().getShortNameToLog() + "; next sequenceid=" + nextSeqid); @@ -658,6 +672,7 @@ public class HRegion implements HeapSize { // , Writable{ this.closing.set(false); this.closed.set(false); + this.completeSequenceId = nextSeqid; if (coprocessorHost != null) { status.setStatus("Running coprocessor post-open hooks"); coprocessorHost.postOpen(); @@ -947,6 +962,16 @@ public class HRegion implements HeapSize { // , Writable{ /** Default interval for the memstore flush */ public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000; + /** Conf key to force a flush if there are already enough changes for one region in memstore */ + public static final String MEMSTORE_FLUSH_PER_CHANGES = + "hbase.regionserver.flush.per.changes"; + public static final long DEFAULT_FLUSH_PER_CHANGES = 30000000; // 30 millions + /** + * The following MAX_FLUSH_PER_CHANGES is large enough because each KeyValue has 20+ bytes + * overhead. Therefore, even 1G empty KVs occupy at least 20GB memstore size for a single region + */ + public static final long MAX_FLUSH_PER_CHANGES = 1000000000; // 1G + /** * Close down this HRegion. Flush the cache unless abort parameter is true, * Shut down each HStore, don't service any more calls. @@ -1447,6 +1472,9 @@ public class HRegion implements HeapSize { // , Writable{ * Should the memstore be flushed now */ boolean shouldFlush() { + if(this.completeSequenceId + this.flushPerChanges < this.sequenceId.get()) { + return true; + } if (flushCheckInterval <= 0) { //disabled return false; } @@ -1657,9 +1685,7 @@ public class HRegion implements HeapSize { // , Writable{ this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis(); // Update the last flushed sequence id for region - if (this.rsServices != null) { - completeSequenceId = flushSeqId; - } + completeSequenceId = flushSeqId; // C. Finally notify anyone waiting on memstore to clear: // e.g. checkResources(). @@ -5232,7 +5258,7 @@ public class HRegion implements HeapSize { // , Writable{ ClassSize.OBJECT + ClassSize.ARRAY + 41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + - (11 * Bytes.SIZEOF_LONG) + + (12 * Bytes.SIZEOF_LONG) + 5 * Bytes.SIZEOF_BOOLEAN); // woefully out of date - currently missing: diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java index ddd928c..3c68e5b 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogSplitter.java @@ -65,6 +65,9 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagType; +import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HConnection; @@ -73,6 +76,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.io.HeapSize; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -163,6 +167,8 @@ public class HLogSplitter { // Min batch size when replay WAL edits private final int minBatchSize; + private static boolean bTagSupportEnabled = false; + HLogSplitter(Configuration conf, Path rootDir, FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) { this.conf = conf; @@ -191,6 +197,12 @@ public class HLogSplitter { this.distributedLogReplay = false; outputSink = new LogRecoveredEditsOutputSink(numWriterThreads); } + + // since HFile V3, Tags are supported + bTagSupportEnabled = (HFile.getFormatVersion(conf) >= 3); + if (this.distributedLogReplay && bTagSupportEnabled) { + LOG.info("Tag replay edits with LOG_REPLAY_TAG"); + } } /** @@ -1483,6 +1495,7 @@ public class HLogSplitter { if (!skippedKVs.isEmpty()) { kvs.removeAll(skippedKVs); } + synchronized (serverToBufferQueueMap) { locKey = loc.getHostnamePort() + KEY_DELIMITER + table; List> queue = serverToBufferQueueMap.get(locKey); @@ -1851,6 +1864,37 @@ public class HLogSplitter { public final long nonce; } + /** + * Tag original sequence number for each edit to be replayed + * @param entry + * @param cell + * @return + */ + private static Cell tagReplayLogSequenceNumber(WALEntry entry, Cell cell) { + // Tag puts with original sequence number if there is no LOG_REPLAY_TAG yet + boolean needAddRecoveryTag = false; + if (bTagSupportEnabled) { + needAddRecoveryTag = true; + if (cell.getTagsLength() > 0) { + Tag tmpTag = Tag.getTag(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength(), + TagType.LOG_REPLAY_TAG_TYPE.value()); + if(tmpTag != null) { + // found an existing log replay tag so reuse it + needAddRecoveryTag = false; + } + } + } + if (needAddRecoveryTag) { + List newTags = new ArrayList(); + Tag replayTag = new Tag(TagType.LOG_REPLAY_TAG_TYPE.value(), + Bytes.toBytes(entry.getKey().getLogSequenceNumber())); + newTags.add(replayTag); + return KeyValue.addTags(cell, newTags); + } + return cell; + } + + /** * This function is used to construct mutations from a WALEntry. It also reconstructs HLogKey & * WALEdit from the passed in WALEntry @@ -1907,7 +1951,8 @@ public class HLogSplitter { if (CellUtil.isDelete(cell)) { ((Delete) m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell)); } else { - ((Put) m).add(KeyValueUtil.ensureKeyValue(cell)); + Cell tmpNewCell = tagReplayLogSequenceNumber(entry, cell); + ((Put) m).add(KeyValueUtil.ensureKeyValue(tmpNewCell)); } previousCell = cell; } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java index e11b972..8fc5d54 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java @@ -23,6 +23,7 @@ import java.util.Map.Entry; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.MultiUserAuthorizations; @@ -43,7 +44,7 @@ public class VisibilityUtils { public static final String VISIBILITY_LABEL_GENERATOR_CLASS = "hbase.regionserver.scan.visibility.label.generator.class"; - public static final byte VISIBILITY_TAG_TYPE = (byte) 2; + public static final byte VISIBILITY_TAG_TYPE = TagType.VISIBILITY_TAG_TYPE.value(); public static final String SYSTEM_LABEL = "system"; /** diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 99a8f02..be16e30 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -962,11 +962,6 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { hbaseAdmin = null; } - if (zooKeeperWatcher != null) { - zooKeeperWatcher.close(); - zooKeeperWatcher = null; - } - // unset the configuration for MIN and MAX RS to start conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1); conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1); @@ -976,6 +971,11 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { this.hbaseCluster.waitUntilShutDown(); this.hbaseCluster = null; } + + if (zooKeeperWatcher != null) { + zooKeeperWatcher.close(); + zooKeeperWatcher = null; + } } /** diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 2b4e4f4..e81da00 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -70,17 +70,20 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.SplitLogCounters; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.NonceGenerator; import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.exceptions.OperationConflictException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; import org.apache.hadoop.hbase.master.SplitLogManager.TaskBatch; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.wal.HLog; @@ -1159,6 +1162,198 @@ public class TestDistributedLogSplitting { zkw.close(); } + @Test(timeout = 300000) + public void testSameVersionUpdatesRecovery() throws Exception { + LOG.info("testSameVersionUpdatesRecovery"); + conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + conf.setInt("hfile.format.version", 3); + startCluster(NUM_RS); + final AtomicLong sequenceId = new AtomicLong(100); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingMeta = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs); + for (HRegionInfo region : regions) { + if (region.isMetaRegion()) { + isCarryingMeta = true; + break; + } + } + if (isCarryingMeta) { + continue; + } + break; + } + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable() + || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { + it.remove(); + } + } + if (regions.size() == 0) return; + HRegionInfo curRegionInfo = regions.get(0); + byte[] startRow = curRegionInfo.getStartKey(); + if (startRow == null || startRow.length == 0) { + startRow = new byte[] { 0, 0, 0, 0, 1 }; + } + byte[] row = Bytes.incrementBytes(startRow, 1); + // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key + row = Arrays.copyOfRange(row, 3, 8); + long value = 0; + byte[] tableName = Bytes.toBytes("table"); + byte[] family = Bytes.toBytes("family"); + byte[] qualifier = Bytes.toBytes("c1"); + long timeStamp = System.currentTimeMillis(); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); + htd.addFamily(new HColumnDescriptor(family)); + for (int i = 0; i < NUM_LOG_LINES; i += 1) { + WALEdit e = new WALEdit(); + value++; + e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); + hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e, + System.currentTimeMillis(), htd, sequenceId); + } + hrs.getWAL().sync(); + hrs.getWAL().close(); + + // wait for abort completes + this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); + + // verify we got the last value + LOG.info("Verification Starts..."); + Get g = new Get(row); + Result r = ht.get(g); + long theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + + // after flush + LOG.info("Verification after flush..."); + TEST_UTIL.getHBaseAdmin().flush(tableName); + r = ht.get(g); + theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + ht.close(); + } + + @Test(timeout = 300000) + public void testSameVersionUpdatesRecoveryWithCompaction() throws Exception { + LOG.info("testSameVersionUpdatesRecoveryWithWrites"); + conf.setLong("hbase.regionserver.hlog.blocksize", 15 * 1024); + conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 30 * 1024); + conf.setInt("hbase.hstore.compactionThreshold", 3); + conf.setInt("hfile.format.version", 3); + startCluster(NUM_RS); + final AtomicLong sequenceId = new AtomicLong(100); + final int NUM_REGIONS_TO_CREATE = 40; + final int NUM_LOG_LINES = 1000; + // turn off load balancing to prevent regions from moving around otherwise + // they will consume recovered.edits + master.balanceSwitch(false); + + List rsts = cluster.getLiveRegionServerThreads(); + final ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "table-creation", null); + HTable ht = installTable(zkw, "table", "family", NUM_REGIONS_TO_CREATE); + + List regions = null; + HRegionServer hrs = null; + for (int i = 0; i < NUM_RS; i++) { + boolean isCarryingMeta = false; + hrs = rsts.get(i).getRegionServer(); + regions = ProtobufUtil.getOnlineRegions(hrs); + for (HRegionInfo region : regions) { + if (region.isMetaRegion()) { + isCarryingMeta = true; + break; + } + } + if (isCarryingMeta) { + continue; + } + break; + } + + LOG.info("#regions = " + regions.size()); + Iterator it = regions.iterator(); + while (it.hasNext()) { + HRegionInfo region = it.next(); + if (region.isMetaTable() + || region.getEncodedName().equals(HRegionInfo.FIRST_META_REGIONINFO.getEncodedName())) { + it.remove(); + } + } + if (regions.size() == 0) return; + HRegionInfo curRegionInfo = regions.get(0); + byte[] startRow = curRegionInfo.getStartKey(); + if (startRow == null || startRow.length == 0) { + startRow = new byte[] { 0, 0, 0, 0, 1 }; + } + byte[] row = Bytes.incrementBytes(startRow, 1); + // use last 5 bytes because HBaseTestingUtility.createMultiRegions use 5 bytes key + row = Arrays.copyOfRange(row, 3, 8); + long value = 0; + final byte[] tableName = Bytes.toBytes("table"); + byte[] family = Bytes.toBytes("family"); + byte[] qualifier = Bytes.toBytes("c1"); + long timeStamp = System.currentTimeMillis(); + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName)); + htd.addFamily(new HColumnDescriptor(family)); + for (int i = 0; i < NUM_LOG_LINES; i += 1) { + WALEdit e = new WALEdit(); + value++; + e.add(new KeyValue(row, family, qualifier, timeStamp, Bytes.toBytes(value))); + hrs.getWAL().append(curRegionInfo, TableName.valueOf(tableName), e, + System.currentTimeMillis(), htd, sequenceId); + } + hrs.getWAL().sync(); + hrs.getWAL().close(); + + // wait for abort completes + this.abortRSAndWaitForRecovery(hrs, zkw, NUM_REGIONS_TO_CREATE); + + // verify we got the last value + LOG.info("Verification Starts..."); + Get g = new Get(row); + Result r = ht.get(g); + long theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + + // after flush & compaction + LOG.info("Verification after flush..."); + TEST_UTIL.getHBaseAdmin().flush(tableName); + TEST_UTIL.getHBaseAdmin().compact(tableName); + + // wait for compaction completes + TEST_UTIL.waitFor(30000, 200, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + return (TEST_UTIL.getHBaseAdmin().getCompactionState(tableName) == CompactionState.NONE); + } + }); + + r = ht.get(g); + theStoredVal = Bytes.toLong(r.getValue(family, qualifier)); + assertEquals(value, theStoredVal); + ht.close(); + } + HTable installTable(ZooKeeperWatcher zkw, String tname, String fname, int nrs) throws Exception { return installTable(zkw, tname, fname, nrs, 0); }