Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java (revision 0) @@ -0,0 +1,259 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MultithreadedTestUtil; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.NullComparator; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Test; + +import com.google.common.collect.Lists; + + +/** + * Testing of multiPut in parallel. + * + */ +public class TestParallelPut extends HBaseTestCase { + static final Log LOG = LogFactory.getLog(TestParallelPut.class); + + static private HRegion region = null; + private final String DIR = HBaseTestingUtility.getTestDir() + + "/TestParallelPut/"; + + private final int MAX_VERSIONS = 2; + + // Test names + static final byte[] tableName = Bytes.toBytes("testtable");; + static final byte[] qual1 = Bytes.toBytes("qual1"); + static final byte[] qual2 = Bytes.toBytes("qual2"); + static final byte[] qual3 = Bytes.toBytes("qual3"); + static final byte[] value1 = Bytes.toBytes("value1"); + static final byte[] value2 = Bytes.toBytes("value2"); + static final byte [] row = Bytes.toBytes("rowA"); + static final byte [] row2 = Bytes.toBytes("rowB"); + + /** + * @see org.apache.hadoop.hbase.HBaseTestCase#setUp() + */ + @Override + protected void setUp() throws Exception { + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + EnvironmentEdgeManagerTestHelper.reset(); + } + + ////////////////////////////////////////////////////////////////////////////// + // New tests that doesn't spin up a mini cluster but rather just test the + // individual code pieces in the HRegion. + ////////////////////////////////////////////////////////////////////////////// + + /** + * Test one put command. + */ + public void testPut() throws IOException { + LOG.info("Starting testPut"); + initHRegion(tableName, getName(), fam1); + + long value = 1L; + + Put put = new Put(row); + put.add(fam1, qual1, Bytes.toBytes(value)); + region.put(put); + + assertGet(row, fam1, qual1, Bytes.toBytes(value)); + } + + /** + * Test multi-threaded Puts. + */ + public void testParallelPuts() throws IOException { + + LOG.info("Starting test testParallelPuts"); + initHRegion(tableName, getName(), fam1); + int numOps = 1000; // these many operations per thread + + // create 100 threads, each will do its own puts + int numThreads = 100; + Putter[] all = new Putter[numThreads]; + + // create all threads + for (int i = 0; i < numThreads; i++) { + all[i] = new Putter(region, i, numOps); + } + + // run all threads + for (int i = 0; i < numThreads; i++) { + all[i].start(); + } + + // wait for all threads to finish + for (int i = 0; i < numThreads; i++) { + try { + all[i].join(); + } catch (InterruptedException e) { + } + } + LOG.info("testParallelPuts successfully verified " + + (numOps * numThreads) + " put operations."); + } + + + static private void assertGet(byte [] row, + byte [] familiy, + byte[] qualifier, + byte[] value) throws IOException { + // run a get and see is the value matches + Get get = new Get(row); + get.addColumn(familiy, qualifier); + Result result = region.get(get, null); + assertEquals(1, result.size()); + + KeyValue kv = result.raw()[0]; + byte[] r = kv.getValue(); + assertTrue(Bytes.compareTo(r, value) == 0); + } + + private void initHRegion (byte [] tableName, String callingMethod, + byte[] ... families) + throws IOException { + initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families); + } + + private void initHRegion (byte [] tableName, String callingMethod, + Configuration conf, byte [] ... families) + throws IOException{ + HTableDescriptor htd = new HTableDescriptor(tableName); + for(byte [] family : families) { + htd.addFamily(new HColumnDescriptor(family)); + } + HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); + Path path = new Path(DIR + callingMethod); + if (fs.exists(path)) { + if (!fs.delete(path, true)) { + throw new IOException("Failed delete of " + path); + } + } + region = HRegion.createHRegion(info, path, conf, htd); + } + + /** + * A thread that makes a few increment calls + */ + public static class Putter extends Thread { + + private final HRegion region; + private final int threadNumber; + private final int numOps; + private final Random rand = new Random(); + byte [] rowt = null; + + public Putter(HRegion region, int threadNumber, int numOps) { + this.region = region; + this.threadNumber = threadNumber; + this.numOps = numOps; + this.rowt = Bytes.toBytes((long)threadNumber); // unique rowid per thread + setDaemon(true); + } + + @Override + public void run() { + byte[] value = new byte[100]; + Put[] in = new Put[1]; + + // iterate for the specified number of operations + for (int i=0; i> familyMap, UUID clusterId, + void delete(Map> familyMap, UUID clusterId, boolean writeToWAL) throws IOException { Delete delete = new Delete(); delete.setFamilyMap(familyMap); @@ -1506,7 +1507,7 @@ } // Now make changes to the memstore. - long addedSize = applyFamilyMapToMemstore(familyMap); + long addedSize = applyFamilyMapToMemstore(familyMap, null); flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); if (coprocessorHost != null) { @@ -1674,8 +1675,8 @@ } } - long now = EnvironmentEdgeManager.currentTimeMillis(); - byte[] byteNow = Bytes.toBytes(now); + ReadWriteConsistencyControl.WriteEntry w = null; + long txid = 0; boolean locked = false; /** Keep track of the locks we hold so we can release them in finally clause */ @@ -1734,6 +1735,12 @@ lastIndexExclusive++; numReadyToWrite++; } + + // we should record the timestamp only after we have acquired the rowLock, + // otherwise, newer puts are not guaranteed to have a newer timestamp + long now = EnvironmentEdgeManager.currentTimeMillis(); + byte[] byteNow = Bytes.toBytes(now); + // Nothing to put -- an exception in the above such as NoSuchColumnFamily? if (numReadyToWrite <= 0) return 0L; @@ -1752,46 +1759,88 @@ byteNow); } - this.updatesLock.readLock().lock(); locked = true; + // // ------------------------------------ - // STEP 3. Write to WAL + // Acquire the latest rwcc number // ---------------------------------- + w = rwcc.beginMemstoreInsert(); + + // ------------------------------------ + // STEP 3. Write back to memstore + // Write to memstore. It is ok to write to memstore + // first without updating the HLog because we do not roll + // forward the memstore RWCC. The RWCC will be moved up when + // the complete operation is done. These changes are not yet + // visible to scanners till we update the RWCC. Thw RWCC is + // moved only when the sync is complete. + // ---------------------------------- + long addedSize = 0; for (int i = firstIndex; i < lastIndexExclusive; i++) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() + != OperationStatusCode.NOT_RUN) { + continue; + } + addedSize += applyFamilyMapToMemstore(familyMaps[i], w); + batchOp.retCodeDetails[i] = new OperationStatus( + OperationStatusCode.SUCCESS); + } + + // ------------------------------------ + // STEP 4. Write to WAL + // ---------------------------------- + for (int i = firstIndex; i < lastIndexExclusive; i++) { // Skip puts that were determined to be invalid during preprocessing if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { continue; } + batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.SUCCESS); Put p = batchOp.operations[i].getFirst(); if (!p.getWriteToWAL()) continue; addFamilyMapToWALEdit(familyMaps[i], walEdit); } - // Append the edit to WAL + // ------------------------- + // STEP 5. Append the edit to WAL. Do not sync wal. + // ------------------------- Put first = batchOp.operations[firstIndex].getFirst(); - this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, first.getClusterId(), now, this.htableDescriptor); + txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), + walEdit, first.getClusterId(), now, this.htableDescriptor); - // ------------------------------------ - // STEP 4. Write back to memstore - // ---------------------------------- - long addedSize = 0; - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - continue; + // ------------------------------- + // STEP 6. Release row locks, etc. + // ------------------------------- + if (locked) { + this.updatesLock.readLock().unlock(); + locked = false; + } + if (acquiredLocks != null) { + for (Integer toRelease : acquiredLocks) { + releaseRowLock(toRelease); } - addedSize += applyFamilyMapToMemstore(familyMaps[i]); - batchOp.retCodeDetails[i] = new OperationStatus( - OperationStatusCode.SUCCESS); + acquiredLocks = null; } + // ------------------------- + // STEP 7. Sync wal. + // ------------------------- + if (walEdit.size() > 0) { + this.log.sync(txid); + } + // ------------------------------------------------------------------ + // STEP 8. Advance rwcc. This will make this put visible to scanners and getters. + // ------------------------------------------------------------------ + if (w != null) { + rwcc.completeMemstoreInsert(w); + w = null; + } // ------------------------------------ - // STEP 5. Run coprocessor post hooks + // STEP 9. Run coprocessor post hooks. This should be done after the wal is + // sycned so that the coprocessor contract is adhered to. // ------------------------------------ if (coprocessorHost != null) { for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -1808,11 +1857,16 @@ success = true; return addedSize; } finally { - if (locked) + if (w != null) rwcc.completeMemstoreInsert(w); + + if (locked) { this.updatesLock.readLock().unlock(); + } - for (Integer toRelease : acquiredLocks) { - releaseRowLock(toRelease); + if (acquiredLocks != null) { + for (Integer toRelease : acquiredLocks) { + releaseRowLock(toRelease); + } } if (!success) { for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -2050,7 +2104,7 @@ walEdit, clusterId, now, this.htableDescriptor); } - long addedSize = applyFamilyMapToMemstore(familyMap); + long addedSize = applyFamilyMapToMemstore(familyMap, null); flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); } finally { this.updatesLock.readLock().unlock(); @@ -2075,11 +2129,16 @@ * @return the additional memory usage of the memstore caused by the * new entries. */ - private long applyFamilyMapToMemstore(Map> familyMap) { - ReadWriteConsistencyControl.WriteEntry w = null; + private long applyFamilyMapToMemstore(Map> familyMap, + ReadWriteConsistencyControl.WriteEntry w) { long size = 0; + boolean freerwcc = false; + try { - w = rwcc.beginMemstoreInsert(); + if (w == null) { + w = rwcc.beginMemstoreInsert(); + freerwcc = true; + } for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); @@ -2092,7 +2151,9 @@ } } } finally { - rwcc.completeMemstoreInsert(w); + if (freerwcc) { + rwcc.completeMemstoreInsert(w); + } } return size; }