diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index aef95d8..a6d605a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2495,26 +2495,7 @@ public class HRegion implements HeapSize { // , Writable{ } // ------------------------------------ - // STEP 3. Write back to memstore - // Write to memstore. It is ok to write to memstore - // first without updating the HLog because we do not roll - // forward the memstore MVCC. The MVCC will be moved up when - // the complete operation is done. These changes are not yet - // visible to scanners till we update the MVCC. The MVCC is - // moved only when the sync is complete. - // ---------------------------------- - long addedSize = 0; - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - continue; - } - doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote - addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells); - } - - // ------------------------------------ - // STEP 4. Build WAL edit + // STEP 3. Build WAL edit // ---------------------------------- Durability durability = Durability.USE_DEFAULT; for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -2569,7 +2550,7 @@ public class HRegion implements HeapSize { // , Writable{ } // ------------------------- - // STEP 5. Append the final edit to WAL. Do not sync wal. + // STEP 4. Append the final edit to WAL. Do not sync wal. // ------------------------- Mutation mutation = batchOp.getMutation(firstIndex); if (walEdit.size() > 0) { @@ -2583,6 +2564,29 @@ public class HRegion implements HeapSize { // , Writable{ // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); } + // Wait on sequence number to be assigned before adding to memstore. This method will + // block until the edit has had its sequencenumber assigned. Need sequence number before + // can add to memstore. + if (walKey.getSequenceNumber() == HLog.NO_SEQUENCE_ID) throw new IllegalStateException(); + + // ------------------------------------ + // STEP 5. Write back to memstore + // Write to memstore. It is ok to write to memstore + // first without updating the HLog because we do not roll + // forward the memstore MVCC. The MVCC will be moved up when + // the complete operation is done. These changes are not yet + // visible to scanners till we update the MVCC. The MVCC is + // moved only when the sync is complete. + // ---------------------------------- + long addedSize = 0; + for (int i = firstIndex; i < lastIndexExclusive; i++) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() + != OperationStatusCode.NOT_RUN) { + continue; + } + doRollBackMemstore = true; // If we have a failure, we need to clean what we wrote + addedSize += applyFamilyMapToMemstore(familyMaps[i], mvccNum, memstoreCells); + } // ------------------------------- // STEP 6. Release row locks, etc. @@ -2599,7 +2603,7 @@ public class HRegion implements HeapSize { // , Writable{ if (txid != 0) { syncOrDefer(txid, durability); } - + doRollBackMemstore = false; // calling the post CP hook for batch mutation if (!isInReplay && coprocessorHost != null) { @@ -4851,8 +4855,7 @@ public class HRegion implements HeapSize { // , Writable{ if (processor.readOnly()) { try { long now = EnvironmentEdgeManager.currentTimeMillis(); - doProcessRowWithTimeout( - processor, now, this, null, null, timeout); + doProcessRowWithTimeout(processor, now, this, null, null, timeout); processor.postProcess(this, walEdit); } catch (IOException e) { throw e; @@ -4886,29 +4889,15 @@ public class HRegion implements HeapSize { // , Writable{ long now = EnvironmentEdgeManager.currentTimeMillis(); try { - // 4. Let the processor scan the rows, generate mutations and add - // waledits - doProcessRowWithTimeout( - processor, now, this, mutations, walEdit, timeout); + // 4. Let the processor scan the rows, generate mutations and add waledits + doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout); if (!mutations.isEmpty()) { // 5. Get a mvcc write number writeEntry = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); - // 6. Apply to memstore - for (KeyValue kv : mutations) { - kv.setMvccVersion(mvccNum); - Store store = getStore(kv); - if (store == null) { - checkFamily(CellUtil.cloneFamily(kv)); - // unreachable - } - Pair ret = store.add(kv); - addedSize += ret.getFirst(); - memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); - } long txid = 0; - // 7. Append no sync + // 6. Append no sync if (!walEdit.isEmpty()) { walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, now, @@ -4921,6 +4910,23 @@ public class HRegion implements HeapSize { // , Writable{ // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); } + // Wait on sequence number to be assigned before adding to memstore. This method will + // block until the edit has had its sequencenumber assigned. Need sequence number before + // can add to memstore. + if (walKey.getSequenceNumber() == HLog.NO_SEQUENCE_ID) throw new IllegalStateException(); + + // 7. Apply to memstore + for (KeyValue kv : mutations) { + kv.setMvccVersion(mvccNum); + Store store = getStore(kv); + if (store == null) { + checkFamily(CellUtil.cloneFamily(kv)); + // unreachable + } + Pair ret = store.add(kv); + addedSize += ret.getFirst(); + memstoreCells.add(KeyValueUtil.ensureKeyValue(ret.getSecond())); + } // 8. Release region lock if (locked) { @@ -5163,7 +5169,29 @@ public class HRegion implements HeapSize { // , Writable{ tempMemstore.put(store, kvs); } - //Actually write to Memstore now + // Actually write to WAL now + if (writeToWAL) { + // Using default cluster id, as this can only happen in the originating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce); + txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), walKey, walEdits, + this.sequenceId, true, memstoreCells); + } else { + recordMutationWithoutWal(append.getFamilyCellMap()); + } + if (walKey == null){ + // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned + walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); + } + + // Wait on sequence number to be assigned before adding to memstore. This method will + // block until the edit has had its sequencenumber assigned. Need sequence number before + // can add to memstore. + if (walKey.getSequenceNumber() == HLog.NO_SEQUENCE_ID) throw new IllegalStateException(); + + // Actually write to Memstore now for (Map.Entry> entry : tempMemstore.entrySet()) { Store store = entry.getKey(); if (store.getFamily().getMaxVersions() == 1) { @@ -5182,24 +5210,7 @@ public class HRegion implements HeapSize { // , Writable{ } allKVs.addAll(entry.getValue()); } - - // Actually write to WAL now - if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - walKey = new HLogKey(getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce); - txid = this.log.appendNoSync(this.htableDescriptor, getRegionInfo(), walKey, walEdits, - this.sequenceId, true, memstoreCells); - } else { - recordMutationWithoutWal(append.getFamilyCellMap()); - } - if(walKey == null){ - // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned - walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); - } - + size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } finally { @@ -5295,8 +5306,7 @@ public class HRegion implements HeapSize { // , Writable{ w = mvcc.beginMemstoreInsertWithSeqNum(mvccNum); long now = EnvironmentEdgeManager.currentTimeMillis(); // Process each family - for (Map.Entry> family: - increment.getFamilyCellMap().entrySet()) { + for (Map.Entry> family: increment.getFamilyCellMap().entrySet()) { Store store = stores.get(family.getKey()); List kvs = new ArrayList(family.getValue().size()); @@ -5377,12 +5387,35 @@ public class HRegion implements HeapSize { // , Writable{ } } - //store the kvs to the temporary memstore before writing HLog + // Store the kvs to a temporary memstore if (!kvs.isEmpty()) { tempMemstore.put(store, kvs); } } + // Write to WAL now + if (walEdits != null && !walEdits.isEmpty()) { + if (writeToWAL) { + // Using default cluster id, as this can only happen in the originating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), + this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce); + txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), + walKey, walEdits, getSequenceId(), true, memstoreCells); + } else { + recordMutationWithoutWal(increment.getFamilyCellMap()); + } + } + if(walKey == null){ + // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned + walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); + } + // Wait on sequence number to be assigned before adding to memstore. This method will + // block until the edit has had its sequencenumber assigned. Need sequence number before + // can add to memstore. + if (walKey.getSequenceNumber() == HLog.NO_SEQUENCE_ID) throw new IllegalStateException(); + //Actually write to Memstore now if (!tempMemstore.isEmpty()) { for (Map.Entry> entry : tempMemstore.entrySet()) { @@ -5405,25 +5438,6 @@ public class HRegion implements HeapSize { // , Writable{ size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } - - // Actually write to WAL now - if (walEdits != null && !walEdits.isEmpty()) { - if (writeToWAL) { - // Using default cluster id, as this can only happen in the originating - // cluster. A slave cluster receives the final value (not the delta) - // as a Put. - walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(), - this.htableDescriptor.getTableName(), HLog.NO_SEQUENCE_ID, nonceGroup, nonce); - txid = this.log.appendNoSync(this.htableDescriptor, this.getRegionInfo(), - walKey, walEdits, getSequenceId(), true, memstoreCells); - } else { - recordMutationWithoutWal(increment.getFamilyCellMap()); - } - } - if(walKey == null){ - // Append a faked WALEdit in order for SKIP_WAL updates to get mvccNum assigned - walKey = this.appendNoSyncNoAppend(this.log, memstoreCells); - } } finally { this.updatesLock.readLock().unlock(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java index 44c2d9e..9a3c3f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java @@ -116,16 +116,18 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool private final int syncInterval; private final HTableDescriptor htd; private final Sampler loopSampler; + private final boolean regionAPI; HLogPutBenchmark(final HRegion region, final HTableDescriptor htd, final long numIterations, final boolean noSync, final int syncInterval, - final double traceFreq) { + final double traceFreq, final boolean regionAPI) { this.numIterations = numIterations; this.noSync = noSync; this.syncInterval = syncInterval; this.numFamilies = htd.getColumnFamilies().length; this.region = region; this.htd = htd; + this.regionAPI = regionAPI; String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes"); if (spanReceivers == null || spanReceivers.isEmpty()) { loopSampler = Sampler.NEVER; @@ -165,15 +167,19 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool try { long now = System.nanoTime(); Put put = setupPut(rand, key, value, numFamilies); - WALEdit walEdit = new WALEdit(); - addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit); - HRegionInfo hri = region.getRegionInfo(); - hlog.appendNoSync(hri, hri.getTable(), walEdit, clusters, now, htd, - region.getSequenceId(), true, nonce, nonce); - if (!this.noSync) { - if (++lastSync >= this.syncInterval) { - hlog.sync(); - lastSync = 0; + if (regionAPI) { + region.put(put); + } else { + WALEdit walEdit = new WALEdit(); + addFamilyMapToWALEdit(put.getFamilyCellMap(), walEdit); + HRegionInfo hri = region.getRegionInfo(); + hlog.appendNoSync(hri, hri.getTable(), walEdit, clusters, now, htd, + region.getSequenceId(), true, nonce, nonce); + if (!this.noSync) { + if (++lastSync >= this.syncInterval) { + hlog.sync(); + lastSync = 0; + } } } latencyHistogram.update(System.nanoTime() - now); @@ -209,11 +215,12 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool String spanReceivers = getConf().get("hbase.trace.spanreceiver.classes"); boolean trace = spanReceivers != null && !spanReceivers.isEmpty(); double traceFreq = 1.0; + boolean regionAPI = false; // Process command line args for (int i = 0; i < args.length; i++) { String cmd = args[i]; try { - if (cmd.equals("-threads")) { + if (cmd.equals("-threads") || cmd.equals("--threads")) { numThreads = Integer.parseInt(args[++i]); } else if (cmd.equals("-iterations")) { numIterations = Long.parseLong(args[++i]); @@ -247,8 +254,12 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool cipher = args[++i]; } else if (cmd.equals("-traceFreq")) { traceFreq = Double.parseDouble(args[++i]); + } else if (cmd.equals("-regionapi")) { + regionAPI = true; } else if (cmd.equals("-h")) { printUsageAndExit(); + } else if (cmd.equals("-help")) { + printUsageAndExit(); } else if (cmd.equals("--help")) { printUsageAndExit(); } else { @@ -368,9 +379,8 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool region = openRegion(fs, rootRegionDir, htd, hlog); ConsoleReporter.enable(this.metrics, 30, TimeUnit.SECONDS); long putTime = - runBenchmark(Trace.wrap( - new HLogPutBenchmark(region, htd, numIterations, noSync, syncInterval, traceFreq)), - numThreads); + runBenchmark(Trace.wrap(new HLogPutBenchmark(region, htd, numIterations, noSync, + syncInterval, traceFreq, regionAPI)), numThreads); logBenchmarkResult("Summary: threads=" + numThreads + ", iterations=" + numIterations + ", syncInterval=" + syncInterval, numIterations * numThreads, putTime);