Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1177401) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3503,6 +3503,7 @@ List kvs = new ArrayList(increment.numColumns()); long now = EnvironmentEdgeManager.currentTimeMillis(); long size = 0; + long txid = 0; // Lock row startRegionOperation(); @@ -3561,7 +3562,7 @@ // Using default cluster id, as this can only happen in the orginating // cluster. A slave cluster receives the final value (not the delta) // as a Put. - this.log.append(regionInfo, this.htableDescriptor.getName(), + txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), walEdits, HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); } @@ -3572,6 +3573,9 @@ this.updatesLock.readLock().unlock(); releaseRowLock(lid); } + if (writeToWAL) { + this.log.sync(txid); // sync the transaction log outside the rowlock + } } finally { closeRegionOperation(); } @@ -3599,6 +3603,7 @@ checkRow(row, "increment"); boolean flush = false; boolean wrongLength = false; + long txid = 0; // Lock row long result = amount; startRegionOperation(); @@ -3642,7 +3647,7 @@ // Using default cluster id, as this can only happen in the // orginating cluster. A slave cluster receives the final value (not // the delta) as a Put. - this.log.append(regionInfo, this.htableDescriptor.getName(), + txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), walEdit, HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); } @@ -3659,6 +3664,9 @@ this.updatesLock.readLock().unlock(); releaseRowLock(lid); } + if (writeToWAL) { + this.log.sync(txid); // sync the transaction log outside the rowlock + } } finally { closeRegionOperation(); } Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java (revision 0) @@ -0,0 +1,265 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MultithreadedTestUtil; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.NullComparator; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Test; + +import com.google.common.collect.Lists; + + +/** + * Testing of HRegion.incrementColumnValue + * + */ +public class TestIncrement extends HBaseTestCase { + static final Log LOG = LogFactory.getLog(TestIncrement.class); + + HRegion region = null; + private final String DIR = HBaseTestingUtility.getTestDir() + + "/TestIncrement/"; + + private final int MAX_VERSIONS = 2; + + // Test names + static final byte[] tableName = Bytes.toBytes("testtable");; + static final byte[] qual1 = Bytes.toBytes("qual1"); + static final byte[] qual2 = Bytes.toBytes("qual2"); + static final byte[] qual3 = Bytes.toBytes("qual3"); + static final byte[] value1 = Bytes.toBytes("value1"); + static final byte[] value2 = Bytes.toBytes("value2"); + static final byte [] row = Bytes.toBytes("rowA"); + static final byte [] row2 = Bytes.toBytes("rowB"); + + /** + * @see org.apache.hadoop.hbase.HBaseTestCase#setUp() + */ + @Override + protected void setUp() throws Exception { + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + EnvironmentEdgeManagerTestHelper.reset(); + } + + ////////////////////////////////////////////////////////////////////////////// + // New tests that doesn't spin up a mini cluster but rather just test the + // individual code pieces in the HRegion. + ////////////////////////////////////////////////////////////////////////////// + + /** + * Test one increment command. + */ + public void testIncrementColumnValue() throws IOException { + LOG.info("Starting test testIncrementColumnValue"); + initHRegion(tableName, getName(), fam1); + + long value = 1L; + long amount = 3L; + + Put put = new Put(row); + put.add(fam1, qual1, Bytes.toBytes(value)); + region.put(put); + + long result = region.incrementColumnValue(row, fam1, qual1, amount, true); + + assertEquals(value+amount, result); + + Store store = region.getStore(fam1); + // ICV removes any extra values floating around in there. + assertEquals(1, store.memstore.kvset.size()); + assertTrue(store.memstore.snapshot.isEmpty()); + + assertICV(row, fam1, qual1, value+amount); + } + + /** + * Test multi-threaded increments. + */ + public void testIncrementMultiThreads() throws IOException { + + LOG.info("Starting test testIncrementMultiThreads"); + initHRegion(tableName, getName(), fam1); + + // create 100 threads, each will increment by its own quantity + int numThreads = 100; + int incrementsPerThread = 1000; + Incrementer[] all = new Incrementer[numThreads]; + int expectedTotal = 0; + + // create all threads + for (int i = 0; i < numThreads; i++) { + all[i] = new Incrementer(region, i, i, incrementsPerThread); + expectedTotal += (i * incrementsPerThread); + } + + // run all threads + for (int i = 0; i < numThreads; i++) { + all[i].start(); + } + + // wait for all threads to finish + for (int i = 0; i < numThreads; i++) { + try { + all[i].join(); + } catch (InterruptedException e) { + } + } + assertICV(row, fam1, qual1, expectedTotal); + LOG.info("testIncrementMultiThreads successfully verified that total is " + + expectedTotal); + } + + + private void assertICV(byte [] row, + byte [] familiy, + byte[] qualifier, + long amount) throws IOException { + // run a get and see? + Get get = new Get(row); + get.addColumn(familiy, qualifier); + Result result = region.get(get, null); + assertEquals(1, result.size()); + + KeyValue kv = result.raw()[0]; + long r = Bytes.toLong(kv.getValue()); + assertEquals(amount, r); + } + + private void initHRegion (byte [] tableName, String callingMethod, + byte[] ... families) + throws IOException { + initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families); + } + + private void initHRegion (byte [] tableName, String callingMethod, + Configuration conf, byte [] ... families) + throws IOException{ + HTableDescriptor htd = new HTableDescriptor(tableName); + for(byte [] family : families) { + htd.addFamily(new HColumnDescriptor(family)); + } + HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); + Path path = new Path(DIR + callingMethod); + if (fs.exists(path)) { + if (!fs.delete(path, true)) { + throw new IOException("Failed delete of " + path); + } + } + region = HRegion.createHRegion(info, path, conf, htd); + } + + /** + * A thread that makes a few increment calls + */ + public static class Incrementer extends Thread { + + private final HRegion region; + private final int threadNumber; + private final int numIncrements; + private final int amount; + + private int count; + + public Incrementer(HRegion region, + int threadNumber, int amount, int numIncrements) { + this.region = region; + this.threadNumber = threadNumber; + this.numIncrements = numIncrements; + this.count = 0; + this.amount = amount; + setDaemon(true); + } + + @Override + public void run() { + for (int i=0; i] " + + " [-numIterationsPerThread ] " + + " [-path ]" + + " [-nosync]"); + } + + /** + * A thread that writes data to an HLog + */ + public static class LogWriter extends Thread { + + private final HRegion region; + private final int threadNumber; + private final int numIncrements; + private final HLog hlog; + private boolean appendNoSync; + private byte[] tableName; + + private int count; + + public LogWriter(HRegion region, byte[] tableName, + HLog log, int threadNumber, + int numIncrements, boolean appendNoSync) { + this.region = region; + this.threadNumber = threadNumber; + this.numIncrements = numIncrements; + this.hlog = log; + this.count = 0; + this.appendNoSync = appendNoSync; + this.tableName = tableName; + setDaemon(true); + //log("LogWriter[" + threadNumber + "] instantiated"); + } + + @Override + public void run() { + long now = System.currentTimeMillis(); + byte [] key = Bytes.toBytes("thisisakey"); + KeyValue kv = new KeyValue(key, now); + WALEdit walEdit = new WALEdit(); + walEdit.add(kv); + HRegionInfo hri = region.getRegionInfo(); + HTableDescriptor htd = new HTableDescriptor(); + htd.addFamily(new HColumnDescriptor(Bytes.toBytes("f1"))); + boolean isMetaRegion = false; + long start = System.currentTimeMillis(); + for (int i=0; i pendingWrites = new LinkedList(); + LogSyncer(long optionalFlushInterval) { this.optionalFlushInterval = optionalFlushInterval; } @@ -1075,7 +1146,9 @@ while(!this.isInterrupted()) { try { - Thread.sleep(this.optionalFlushInterval); + if (unflushedEntries.get() <= syncedTillHere) { + Thread.sleep(this.optionalFlushInterval); + } sync(); } catch (IOException e) { LOG.error("Error while syncing, requesting close of hlog ", e); @@ -1088,38 +1161,85 @@ LOG.info(getName() + " exiting"); } } + + // appends new writes to the pendingWrites. It is better to keep it in + // our own queue rather than writing it to the HDFS output stream because + // HDFSOutputStream.writeChunk is not lightweight at all. + synchronized void append(Entry e) throws IOException { + pendingWrites.add(e); + } + + // Returns all currently pending writes. New writes + // will accumulate in a new list. + synchronized List getPendingWrites() { + List save = this.pendingWrites; + this.pendingWrites = new LinkedList(); + return save; + } + + // writes out pending entries to the HLog + void hlogFlush(Writer writer) throws IOException { + // Atomically fetch all existing pending writes. New writes + // will start accumulating in a new list. + List pending = getPendingWrites(); + + // write out all accumulated Entries to hdfs. + for (Entry e : pending) { + writer.append(e); + } + } } + // sync all known transactions private void syncer() throws IOException { + syncer(this.unflushedEntries.get()); // sync all pending items + } + + // sync all transactions upto the specified txid + private void syncer(long txid) throws IOException { synchronized (this.updateLock) { - if (this.closed) { - return; - } + if (this.closed) return; } + // if the transaction that we are interested in is already + // synced, then return immediately. + if (txid <= this.syncedTillHere) { + return; + } try { + long doneUpto = this.unflushedEntries.get(); long now = System.currentTimeMillis(); // Done in parallel for all writer threads, thanks to HDFS-895 boolean syncSuccessful = true; try { + // First flush all the pending writes to HDFS. Then + // issue the sync to HDFS. If sync is successful, then update + // syncedTillHere to indicate that transactions till this + // number has been successfully synced. + logSyncerThread.hlogFlush(this.writer); this.writer.sync(); + syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); + this.syncedTillHere = doneUpto; } catch(IOException io) { syncSuccessful = false; } - synchronized (this.updateLock) { - if (!syncSuccessful) { + if (!syncSuccessful) { + synchronized (this.updateLock) { // HBASE-4387, retry with updateLock held this.writer.sync(); + syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); + this.syncedTillHere = doneUpto; } - syncTime += System.currentTimeMillis() - now; - syncOps++; - if (!this.logRollRunning) { - checkLowReplication(); - if (this.writer.getLength() > this.logrollsize) { - requestLogRoll(); - } + } + // We try to not acquire the updateLock just to update statistics. + // Make these statistics as AtomicLong. + syncTime.addAndGet(System.currentTimeMillis() - now); + syncOps.incrementAndGet(); + if (!this.logRollRunning) { + checkLowReplication(); + if (this.writer.getLength() > this.logrollsize) { + requestLogRoll(); } } - } catch (IOException e) { LOG.fatal("Could not sync. Requesting close of hlog", e); requestLogRoll(); @@ -1212,6 +1332,10 @@ syncer(); } + public void sync(long txid) throws IOException { + syncer(txid); + } + private void requestLogRoll() { if (!this.listeners.isEmpty()) { for (WALActionsListener i: this.listeners) { @@ -1235,8 +1359,8 @@ long now = System.currentTimeMillis(); // coprocessor hook: if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) { - // if not bypassed: - this.writer.append(new HLog.Entry(logKey, logEdit)); + // write to our buffer for the Hlog file. + logSyncerThread.append(new HLog.Entry(logKey, logEdit)); } long took = System.currentTimeMillis() - now; coprocessorHost.postWALWrite(info, logKey, logEdit); @@ -1357,18 +1481,20 @@ if (this.closed) { return; } + long txid = 0; synchronized (updateLock) { long now = System.currentTimeMillis(); WALEdit edit = completeCacheFlushLogEdit(); HLogKey key = makeKey(encodedRegionName, tableName, logSeqId, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); - this.writer.append(new Entry(key, edit)); + logSyncerThread.append(new Entry(key, edit)); + txid = this.unflushedEntries.incrementAndGet(); writeTime += System.currentTimeMillis() - now; writeOps++; this.numEntries.incrementAndGet(); } // sync txn to file system - this.sync(); + this.sync(txid); } finally { // updateLock not needed for removing snapshot's entry