Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestParallelPut.java (revision 0) @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MultithreadedTestUtil; +import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.NullComparator; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Test; + +import com.google.common.collect.Lists; + + +/** + * Testing of multiPut in parallel. + * + */ +public class TestParallelPut extends HBaseTestCase { + static final Log LOG = LogFactory.getLog(TestParallelPut.class); + + static private HRegion region = null; + private final String DIR = HBaseTestingUtility.getTestDir() + + "/TestParallelPut/"; + + private final int MAX_VERSIONS = 2; + + // Test names + static final byte[] tableName = Bytes.toBytes("testtable");; + static final byte[] qual1 = Bytes.toBytes("qual1"); + static final byte[] qual2 = Bytes.toBytes("qual2"); + static final byte[] qual3 = Bytes.toBytes("qual3"); + static final byte[] value1 = Bytes.toBytes("value1"); + static final byte[] value2 = Bytes.toBytes("value2"); + static final byte [] row = Bytes.toBytes("rowA"); + static final byte [] row2 = Bytes.toBytes("rowB"); + + /** + * @see org.apache.hadoop.hbase.HBaseTestCase#setUp() + */ + @Override + protected void setUp() throws Exception { + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + EnvironmentEdgeManagerTestHelper.reset(); + } + + ////////////////////////////////////////////////////////////////////////////// + // New tests that don't spin up a mini cluster but rather just test the + // individual code pieces in the HRegion. + ////////////////////////////////////////////////////////////////////////////// + + /** + * Test one put command. + */ + public void testPut() throws IOException { + LOG.info("Starting testPut"); + initHRegion(tableName, getName(), fam1); + + long value = 1L; + + Put put = new Put(row); + put.add(fam1, qual1, Bytes.toBytes(value)); + region.put(put); + + assertGet(row, fam1, qual1, Bytes.toBytes(value)); + } + + /** + * Test multi-threaded Puts. + */ + public void testParallelPuts() throws IOException { + + LOG.info("Starting testParallelPuts"); + initHRegion(tableName, getName(), fam1); + int numOps = 1000; // these many operations per thread + + // create 100 threads, each will do its own puts + int numThreads = 100; + Putter[] all = new Putter[numThreads]; + + // create all threads + for (int i = 0; i < numThreads; i++) { + all[i] = new Putter(region, i, numOps); + } + + // run all threads + for (int i = 0; i < numThreads; i++) { + all[i].start(); + } + + // wait for all threads to finish + for (int i = 0; i < numThreads; i++) { + try { + all[i].join(); + } catch (InterruptedException e) { + LOG.warn("testParallelPuts encountered InterruptedException." + + " Ignoring....", e); + } + } + LOG.info("testParallelPuts successfully verified " + + (numOps * numThreads) + " put operations."); + } + + + static private void assertGet(byte [] row, + byte [] familiy, + byte[] qualifier, + byte[] value) throws IOException { + // run a get and see if the value matches + Get get = new Get(row); + get.addColumn(familiy, qualifier); + Result result = region.get(get, null); + assertEquals(1, result.size()); + + KeyValue kv = result.raw()[0]; + byte[] r = kv.getValue(); + assertTrue(Bytes.compareTo(r, value) == 0); + } + + private void initHRegion(byte [] tableName, String callingMethod, + byte[] ... families) + throws IOException { + initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families); + } + + private void initHRegion(byte [] tableName, String callingMethod, + Configuration conf, byte [] ... families) + throws IOException{ + HTableDescriptor htd = new HTableDescriptor(tableName); + for(byte [] family : families) { + htd.addFamily(new HColumnDescriptor(family)); + } + HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); + Path path = new Path(DIR + callingMethod); + if (fs.exists(path)) { + if (!fs.delete(path, true)) { + throw new IOException("Failed delete of " + path); + } + } + region = HRegion.createHRegion(info, path, conf, htd); + } + + /** + * A thread that makes a few increment calls + */ + public static class Putter extends Thread { + + private final HRegion region; + private final int threadNumber; + private final int numOps; + private final Random rand = new Random(); + byte [] rowkey = null; + + public Putter(HRegion region, int threadNumber, int numOps) { + this.region = region; + this.threadNumber = threadNumber; + this.numOps = numOps; + this.rowkey = Bytes.toBytes((long)threadNumber); // unique rowid per thread + setDaemon(true); + } + + @Override + public void run() { + byte[] value = new byte[100]; + Put[] in = new Put[1]; + + // iterate for the specified number of operations + for (int i=0; i= e.getWriteNumber()) { + return true; + } + return false; } + } + /** + * Wait for the global readPoint to advance upto + * the specified transaction number. + */ + public void waitForRead(WriteEntry e) { boolean interrupted = false; synchronized (readWaiters) { while (memstoreRead < e.getWriteNumber()) { Index: src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (revision 1179529) +++ src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (working copy) @@ -1240,10 +1240,9 @@ requestLogRoll(); } } - } catch (IOException e) { - LOG.fatal("Could not sync. Requesting close of hlog", e); - requestLogRoll(); - throw e; + } catch (Throwable e) { + LOG.fatal("Could not sync. Exiting regionserver..", e); + Runtime.getRuntime().exit(-100); } } Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1179529) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -1139,6 +1139,10 @@ long sequenceId = -1L; long completeSequenceId = -1L; + // record the rwcc for all transactions in progress. + ReadWriteConsistencyControl.WriteEntry w = rwcc.beginMemstoreInsert(); + rwcc.advanceMemstore(w); + // We have to take a write lock during snapshot, or else a write could // end up in both snapshot and memstore (makes it difficult to do atomic // rows then) @@ -1175,15 +1179,24 @@ try { // A. Flush memstore to all the HStores. // Keep running vector of all store files that includes both old and the - // just-made new flush store file. + // just-made new flush store file. The new flushed file is still in the + // tmp directory. for (StoreFlusher flusher : storeFlushers) { flusher.flushCache(status); } + + // wait for all in-progress transactions to commit to HLog before + // we can commit the newly created storefiles. This prevents + // uncommitted transactions from being written into HFiles. + // This can block while holding the cacheFlushLock, but that + // should be ok. + rwcc.waitForRead(w); + // Switch snapshot (in memstore) -> new hfile (thus causing // all the store scanners to reset/reseek). for (StoreFlusher flusher : storeFlushers) { - boolean needsCompaction = flusher.commit(); + boolean needsCompaction = flusher.commit(status); if (needsCompaction) { compactionRequested = true; } @@ -1412,11 +1425,12 @@ } /** + * This is used only by unit tests. Not required to be a public API. * @param familyMap map of family to edits for the given family. * @param writeToWAL * @throws IOException */ - public void delete(Map> familyMap, UUID clusterId, + void delete(Map> familyMap, UUID clusterId, boolean writeToWAL) throws IOException { Delete delete = new Delete(); delete.setFamilyMap(familyMap); @@ -1506,7 +1520,7 @@ } // Now make changes to the memstore. - long addedSize = applyFamilyMapToMemstore(familyMap); + long addedSize = applyFamilyMapToMemstore(familyMap, null); flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); if (coprocessorHost != null) { @@ -1674,8 +1688,8 @@ } } - long now = EnvironmentEdgeManager.currentTimeMillis(); - byte[] byteNow = Bytes.toBytes(now); + ReadWriteConsistencyControl.WriteEntry w = null; + long txid = 0; boolean locked = false; /** Keep track of the locks we hold so we can release them in finally clause */ @@ -1734,6 +1748,12 @@ lastIndexExclusive++; numReadyToWrite++; } + + // we should record the timestamp only after we have acquired the rowLock, + // otherwise, newer puts are not guaranteed to have a newer timestamp + long now = EnvironmentEdgeManager.currentTimeMillis(); + byte[] byteNow = Bytes.toBytes(now); + // Nothing to put -- an exception in the above such as NoSuchColumnFamily? if (numReadyToWrite <= 0) return 0L; @@ -1752,46 +1772,86 @@ byteNow); } - this.updatesLock.readLock().lock(); locked = true; + // // ------------------------------------ - // STEP 3. Write to WAL + // Acquire the latest rwcc number // ---------------------------------- + w = rwcc.beginMemstoreInsert(); + + // ------------------------------------ + // STEP 3. Write back to memstore + // Write to memstore. It is ok to write to memstore + // first without updating the HLog because we do not roll + // forward the memstore RWCC. The RWCC will be moved up when + // the complete operation is done. These changes are not yet + // visible to scanners till we update the RWCC. The RWCC is + // moved only when the sync is complete. + // ---------------------------------- + long addedSize = 0; for (int i = firstIndex; i < lastIndexExclusive; i++) { + if (batchOp.retCodeDetails[i].getOperationStatusCode() + != OperationStatusCode.NOT_RUN) { + continue; + } + addedSize += applyFamilyMapToMemstore(familyMaps[i], w); + } + + // ------------------------------------ + // STEP 4. Write to WAL + // ---------------------------------- + for (int i = firstIndex; i < lastIndexExclusive; i++) { // Skip puts that were determined to be invalid during preprocessing if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) { continue; } + batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.SUCCESS); Put p = batchOp.operations[i].getFirst(); if (!p.getWriteToWAL()) continue; addFamilyMapToWALEdit(familyMaps[i], walEdit); } - // Append the edit to WAL + // ------------------------- + // STEP 5. Append the edit to WAL. Do not sync wal. + // ------------------------- Put first = batchOp.operations[firstIndex].getFirst(); - this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, first.getClusterId(), now, this.htableDescriptor); + txid = this.log.appendNoSync(regionInfo, this.htableDescriptor.getName(), + walEdit, first.getClusterId(), now, this.htableDescriptor); - // ------------------------------------ - // STEP 4. Write back to memstore - // ---------------------------------- - long addedSize = 0; - for (int i = firstIndex; i < lastIndexExclusive; i++) { - if (batchOp.retCodeDetails[i].getOperationStatusCode() - != OperationStatusCode.NOT_RUN) { - continue; + // ------------------------------- + // STEP 6. Release row locks, etc. + // ------------------------------- + if (locked) { + this.updatesLock.readLock().unlock(); + locked = false; + } + if (acquiredLocks != null) { + for (Integer toRelease : acquiredLocks) { + releaseRowLock(toRelease); } - addedSize += applyFamilyMapToMemstore(familyMaps[i]); - batchOp.retCodeDetails[i] = new OperationStatus( - OperationStatusCode.SUCCESS); + acquiredLocks = null; } + // ------------------------- + // STEP 7. Sync wal. + // ------------------------- + if (walEdit.size() > 0) { + this.log.sync(txid); + } + // ------------------------------------------------------------------ + // STEP 8. Advance rwcc. This will make this put visible to scanners and getters. + // ------------------------------------------------------------------ + if (w != null) { + rwcc.completeMemstoreInsert(w); + w = null; + } // ------------------------------------ - // STEP 5. Run coprocessor post hooks + // STEP 9. Run coprocessor post hooks. This should be done after the wal is + // sycned so that the coprocessor contract is adhered to. // ------------------------------------ if (coprocessorHost != null) { for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -1808,11 +1868,16 @@ success = true; return addedSize; } finally { - if (locked) + if (w != null) rwcc.completeMemstoreInsert(w); + + if (locked) { this.updatesLock.readLock().unlock(); + } - for (Integer toRelease : acquiredLocks) { - releaseRowLock(toRelease); + if (acquiredLocks != null) { + for (Integer toRelease : acquiredLocks) { + releaseRowLock(toRelease); + } } if (!success) { for (int i = firstIndex; i < lastIndexExclusive; i++) { @@ -2050,7 +2115,7 @@ walEdit, clusterId, now, this.htableDescriptor); } - long addedSize = applyFamilyMapToMemstore(familyMap); + long addedSize = applyFamilyMapToMemstore(familyMap, null); flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); } finally { this.updatesLock.readLock().unlock(); @@ -2072,14 +2137,21 @@ * should already have locked updatesLock.readLock(). This also does * not check the families for validity. * + * @param familyMap Map of kvs per family + * @param localizedWriteEntry The WriteEntry of the RWCC for this transaction * @return the additional memory usage of the memstore caused by the * new entries. */ - private long applyFamilyMapToMemstore(Map> familyMap) { - ReadWriteConsistencyControl.WriteEntry w = null; + private long applyFamilyMapToMemstore(Map> familyMap, + ReadWriteConsistencyControl.WriteEntry localizedWriteEntry) { long size = 0; + boolean freerwcc = false; + try { - w = rwcc.beginMemstoreInsert(); + if (localizedWriteEntry == null) { + localizedWriteEntry = rwcc.beginMemstoreInsert(); + freerwcc = true; + } for (Map.Entry> e : familyMap.entrySet()) { byte[] family = e.getKey(); @@ -2087,12 +2159,14 @@ Store store = getStore(family); for (KeyValue kv: edits) { - kv.setMemstoreTS(w.getWriteNumber()); + kv.setMemstoreTS(localizedWriteEntry.getWriteNumber()); size += store.add(kv); } } } finally { - rwcc.completeMemstoreInsert(w); + if (freerwcc) { + rwcc.completeMemstoreInsert(localizedWriteEntry); + } } return size; } Index: src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1179529) +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -446,10 +446,10 @@ * @param logCacheFlushId flush sequence number * @param snapshot * @param snapshotTimeRangeTracker - * @return true if a compaction is needed + * @return Path The path name of the tmp file to which the store was flushed * @throws IOException */ - private StoreFile flushCache(final long logCacheFlushId, + private Path flushCache(final long logCacheFlushId, SortedSet snapshot, TimeRangeTracker snapshotTimeRangeTracker, MonitoredTask status) throws IOException { @@ -463,16 +463,17 @@ /* * @param cache * @param logCacheFlushId - * @return StoreFile created. + * @return Path The path name of the tmp file to which the store was flushed * @throws IOException */ - private StoreFile internalFlushCache(final SortedSet set, + private Path internalFlushCache(final SortedSet set, final long logCacheFlushId, TimeRangeTracker snapshotTimeRangeTracker, MonitoredTask status) throws IOException { StoreFile.Writer writer = null; long flushed = 0; + Path pathName; // Don't flush if there are no entries. if (set.size() == 0) { return null; @@ -494,6 +495,7 @@ // A. Write the map out to the disk writer = createWriterInTmp(set.size()); writer.setTimeRangeTracker(snapshotTimeRangeTracker); + pathName = writer.getPath(); try { List kvs = new ArrayList(); boolean hasMore; @@ -519,14 +521,34 @@ } finally { scanner.close(); } + if (LOG.isInfoEnabled()) { + LOG.info("Flushed " + + ", sequenceid=" + logCacheFlushId + + ", memsize=" + StringUtils.humanReadableInt(flushed) + + ", into tmp file " + pathName); + } + return pathName; + } + /* + * @param path The pathname of the tmp file into which the store was flushed + * @param logCacheFlushId + * @return StoreFile created. + * @throws IOException + */ + private StoreFile commitFile(final Path path, + final long logCacheFlushId, + TimeRangeTracker snapshotTimeRangeTracker, + MonitoredTask status) + throws IOException { // Write-out finished successfully, move into the right spot + String fileName = path.getName(); Path dstPath = StoreFile.getUniqueFile(fs, homedir); - String msg = "Renaming flushed file at " + writer.getPath() + " to " + dstPath; + String msg = "Renaming flushed file at " + path + " to " + dstPath; LOG.info(msg); status.setStatus("Flushing " + this + ": " + msg); - if (!fs.rename(writer.getPath(), dstPath)) { - LOG.warn("Unable to rename " + writer.getPath() + " to " + dstPath); + if (!fs.rename(path, dstPath)) { + LOG.warn("Unable to rename " + path + " to " + dstPath); } status.setStatus("Flushing " + this + ": reopening flushed file"); @@ -538,7 +560,6 @@ if(LOG.isInfoEnabled()) { LOG.info("Added " + sf + ", entries=" + r.getEntries() + ", sequenceid=" + logCacheFlushId + - ", memsize=" + StringUtils.humanReadableInt(flushed) + ", filesize=" + StringUtils.humanReadableInt(r.length())); } return sf; @@ -1750,6 +1771,7 @@ private long cacheFlushId; private SortedSet snapshot; private StoreFile storeFile; + private Path storeFilePath; private TimeRangeTracker snapshotTimeRangeTracker; private StoreFlusherImpl(long cacheFlushId) { @@ -1765,15 +1787,17 @@ @Override public void flushCache(MonitoredTask status) throws IOException { - storeFile = Store.this.flushCache( + storeFilePath = Store.this.flushCache( cacheFlushId, snapshot, snapshotTimeRangeTracker, status); } @Override - public boolean commit() throws IOException { - if (storeFile == null) { + public boolean commit(MonitoredTask status) throws IOException { + if (storeFilePath == null) { return false; } + storeFile = Store.this.commitFile(storeFilePath, cacheFlushId, + snapshotTimeRangeTracker, status); // Add new file to store files. Clear snapshot too while we have // the Store write lock. return Store.this.updateStorefiles(storeFile, snapshot); Index: src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (revision 1179529) +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (working copy) @@ -60,5 +60,5 @@ * @return * @throws IOException */ - boolean commit() throws IOException; + boolean commit(MonitoredTask status) throws IOException; }