Index: src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogBench.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogBench.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogBench.java (revision 0) @@ -0,0 +1,349 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.util.Random; +import java.text.NumberFormat; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.log4j.Level; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.ipc.HBaseRPC; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestHLogBench extends Configured implements Tool { + + static final Log LOG = LogFactory.getLog(TestHLogBench.class); + private static final Random r = new Random(); + + private static final byte [] FAMILY = Bytes.toBytes("hlogbenchFamily"); + + // accumulate time here + private static int totalTime = 0; + private static Object lock = new Object(); + + // the file system where to create the Hlog file + protected FileSystem fs; + + // the number of threads and the number of iterations per thread + private int numThreads = 300; + private int numIterationsPerThread = 10000; + private Path regionRootDir = new Path(HBaseTestingUtility.getTestDir() + + "/TestHLogBench/"); + private boolean appendNoSync = false; + + public TestHLogBench() { + this(null); + } + + private TestHLogBench(Configuration conf) { + super(conf); + fs = null; + } + + /** + * Initialize file system object + */ + public void init() throws IOException { + getConf().setQuietMode(true); + if (this.fs == null) { + this.fs = FileSystem.get(getConf()); + } + } + + /** + * Close down file system + */ + public void close() throws IOException { + if (fs != null) { + fs.close(); + fs = null; + } + } + + /** + * The main run method of TestHLogBench + */ + public int run(String argv[]) throws Exception { + + int exitCode = -1; + int i = 0; + + // verify that we have enough command line parameters + if (argv.length < 4) { + printUsage(""); + return exitCode; + } + + // initialize LogBench + try { + init(); + } catch (HBaseRPC.VersionMismatch v) { + LOG.warn("Version Mismatch between client and server" + + "... command aborted."); + return exitCode; + } catch (IOException e) { + LOG.warn("Bad connection to FS. command aborted."); + return exitCode; + } + + try { + for (; i < argv.length; i++) { + if ("-numThreads".equals(argv[i])) { + i++; + this.numThreads = Integer.parseInt(argv[i]); + } else if ("-numIterationsPerThread".equals(argv[i])) { + i++; + this.numIterationsPerThread = Integer.parseInt(argv[i]); + } else if ("-path".equals(argv[i])) { + // get an absolute path using the default file system + i++; + this.regionRootDir = new Path(argv[i]); + this.regionRootDir = regionRootDir.makeQualified(this.fs); + } else if ("-nosync".equals(argv[i])) { + this.appendNoSync = true; + } else { + printUsage(argv[i]); + return exitCode; + } + } + } catch (NumberFormatException nfe) { + LOG.warn("Illegal numThreads or numIterationsPerThread, " + + " a positive integer expected"); + throw nfe; + } + go(); + return 0; + } + + private void go() throws IOException, InterruptedException { + + long start = System.currentTimeMillis(); + log("Running TestHLogBench with " + numThreads + " threads each doing " + + numIterationsPerThread + " HLog appends " + + (appendNoSync ? "nosync" : "sync") + + " at rootDir " + regionRootDir); + + // Mock an HRegion + byte [] tableName = Bytes.toBytes("table"); + byte [][] familyNames = new byte [][] { FAMILY }; + HTableDescriptor htd = new HTableDescriptor(); + htd.addFamily(new HColumnDescriptor(Bytes.toBytes("f1"))); + HRegion region = mockRegion(tableName, familyNames, regionRootDir); + HLog hlog = region.getLog(); + + // Spin up N threads to each perform M log operations + LogWriter [] incrementors = new LogWriter[numThreads]; + for (int i=0; i] " + + " [-numIterationsPerThread ] " + + " [-path ]" + + " [-nosync]"); + } + + /** + * A thread that writes data to an HLog + */ + public static class LogWriter extends Thread { + + private final HRegion region; + private final int threadNumber; + private final int numIncrements; + private final HLog hlog; + private boolean appendNoSync; + private byte[] tableName; + + private int count; + + public LogWriter(HRegion region, byte[] tableName, + HLog log, int threadNumber, + int numIncrements, boolean appendNoSync) { + this.region = region; + this.threadNumber = threadNumber; + this.numIncrements = numIncrements; + this.hlog = log; + this.count = 0; + this.appendNoSync = appendNoSync; + this.tableName = tableName; + setDaemon(true); + //log("LogWriter[" + threadNumber + "] instantiated"); + } + + @Override + public void run() { + long now = System.currentTimeMillis(); + byte [] key = Bytes.toBytes("thisisakey"); + KeyValue kv = new KeyValue(key, now); + WALEdit walEdit = new WALEdit(); + walEdit.add(kv); + HRegionInfo hri = region.getRegionInfo(); + HTableDescriptor htd = new HTableDescriptor(); + htd.addFamily(new HColumnDescriptor(Bytes.toBytes("f1"))); + boolean isMetaRegion = false; + long start = System.currentTimeMillis(); + for (int i=0; i pendingWrites = new LinkedList(); + LogSyncer(long optionalFlushInterval) { this.optionalFlushInterval = optionalFlushInterval; } @@ -1075,7 +1146,9 @@ while(!this.isInterrupted()) { try { - Thread.sleep(this.optionalFlushInterval); + if (unflushedEntries.get() <= syncedTillHere) { + Thread.sleep(this.optionalFlushInterval); + } sync(); } catch (IOException e) { LOG.error("Error while syncing, requesting close of hlog ", e); @@ -1088,38 +1161,83 @@ LOG.info(getName() + " exiting"); } } + + // appends new writes to the pendingWrites. It is better to keep it in + // our own queue rather than writing it to the HDFS output stream because + // HDFSOutputStream.writeChunk is not lightweight at all. + synchronized void append(Entry e) throws IOException { + pendingWrites.add(e); + } + + // Returns all currently pending writes. New writes + // will accumulate in a new list. + synchronized List getPendingWrites() { + List save = this.pendingWrites; + this.pendingWrites = new LinkedList(); + return save; + } + + // writes out pending entries to the HLog + void hlogFlush(Writer writer) throws IOException { + // Atomically fetch all existing pending writes. New writes + // will start accumulating in a new list. + List pending = getPendingWrites(); + + // write out all accumulated Entries to hdfs. + for (Entry e : pending) { + writer.append(e); + } + } } + // sync all known transactions private void syncer() throws IOException { - synchronized (this.updateLock) { - if (this.closed) { - return; - } + syncer(this.unflushedEntries.get()); // sync all pending items + } + + // sync all transactions upto the specified txid + private void syncer(long txid) throws IOException { + + // if the transaction that we are interested in is already + // synced, then return immediately. + if (txid <= this.syncedTillHere) { + return; } try { + long doneUpto = this.unflushedEntries.get(); long now = System.currentTimeMillis(); // Done in parallel for all writer threads, thanks to HDFS-895 boolean syncSuccessful = true; try { + // First flush all the pending writes to HDFS. Then + // issue the sync to HDFS. If sync is successful, then update + // syncedTillHere to indicate that transactions till this + // number has been successfully synced. + logSyncerThread.hlogFlush(this.writer); this.writer.sync(); + syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); + this.syncedTillHere = doneUpto; } catch(IOException io) { syncSuccessful = false; } - synchronized (this.updateLock) { - if (!syncSuccessful) { + if (!syncSuccessful) { + synchronized (this.updateLock) { // HBASE-4387, retry with updateLock held this.writer.sync(); + syncBatchSize.addAndGet(doneUpto - this.syncedTillHere); + this.syncedTillHere = doneUpto; } - syncTime += System.currentTimeMillis() - now; - syncOps++; - if (!this.logRollRunning) { - checkLowReplication(); - if (this.writer.getLength() > this.logrollsize) { - requestLogRoll(); - } + } + // We try to not acquire the updateLock just to update statistics. + // Make these statistics as AtomicLong. + syncTime.addAndGet(System.currentTimeMillis() - now); + syncOps.incrementAndGet(); + if (!this.logRollRunning) { + checkLowReplication(); + if (this.writer.getLength() > this.logrollsize) { + requestLogRoll(); } } - } catch (IOException e) { LOG.fatal("Could not sync. Requesting close of hlog", e); requestLogRoll(); @@ -1212,6 +1330,10 @@ syncer(); } + public void sync(long txid) throws IOException { + syncer(txid); + } + private void requestLogRoll() { if (!this.listeners.isEmpty()) { for (WALActionsListener i: this.listeners) { @@ -1235,8 +1357,8 @@ long now = System.currentTimeMillis(); // coprocessor hook: if (!coprocessorHost.preWALWrite(info, logKey, logEdit)) { - // if not bypassed: - this.writer.append(new HLog.Entry(logKey, logEdit)); + // write to our buffer for the Hlog file. + logSyncerThread.append(new HLog.Entry(logKey, logEdit)); } long took = System.currentTimeMillis() - now; coprocessorHost.postWALWrite(info, logKey, logEdit); @@ -1357,18 +1479,20 @@ if (this.closed) { return; } + long txid = 0; synchronized (updateLock) { long now = System.currentTimeMillis(); WALEdit edit = completeCacheFlushLogEdit(); HLogKey key = makeKey(encodedRegionName, tableName, logSeqId, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); - this.writer.append(new Entry(key, edit)); + logSyncerThread.append(new Entry(key, edit)); + txid = this.unflushedEntries.incrementAndGet(); writeTime += System.currentTimeMillis() - now; writeOps++; this.numEntries.incrementAndGet(); } // sync txn to file system - this.sync(); + this.sync(txid); } finally { // updateLock not needed for removing snapshot's entry Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1177401) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -3503,6 +3503,7 @@ List kvs = new ArrayList(increment.numColumns()); long now = EnvironmentEdgeManager.currentTimeMillis(); long size = 0; + long txid = 0; // Lock row startRegionOperation(); @@ -3561,7 +3562,7 @@ // Using default cluster id, as this can only happen in the orginating // cluster. A slave cluster receives the final value (not the delta) // as a Put. - this.log.append(regionInfo, this.htableDescriptor.getName(), + txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), walEdits, HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); } @@ -3572,6 +3573,9 @@ this.updatesLock.readLock().unlock(); releaseRowLock(lid); } + if (writeToWAL) { + this.log.sync(txid); // sync the transaction log outside the rowlock + } } finally { closeRegionOperation(); } @@ -3599,6 +3603,7 @@ checkRow(row, "increment"); boolean flush = false; boolean wrongLength = false; + long txid = 0; // Lock row long result = amount; startRegionOperation(); @@ -3642,7 +3647,7 @@ // Using default cluster id, as this can only happen in the // orginating cluster. A slave cluster receives the final value (not // the delta) as a Put. - this.log.append(regionInfo, this.htableDescriptor.getName(), + txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), walEdit, HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); } @@ -3659,6 +3664,9 @@ this.updatesLock.readLock().unlock(); releaseRowLock(lid); } + if (writeToWAL) { + this.log.sync(txid); // sync the transaction log outside the rowlock + } } finally { closeRegionOperation(); }