Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (revision 1230675) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (working copy) @@ -19,6 +19,9 @@ import java.io.IOException; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -26,10 +29,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.AtomicRowMutation; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.junit.experimental.categories.Category; @@ -239,8 +245,98 @@ } } + /** + * Test multi-threaded increments. + */ + public void testAtomicMutationMultiThreads() throws IOException { + LOG.info("Starting test testAtomicMutationMultiThreads"); + initHRegion(tableName, getName(), fam1); + // create 100 threads, each will increment by its own quantity + int numThreads = 100; + int opsPerThread = 1000; + AtomicOperation[] all = new AtomicOperation[numThreads]; + + AtomicLong timeStamps = new AtomicLong(0); + AtomicInteger failures = new AtomicInteger(0); + // create all threads + for (int i = 0; i < numThreads; i++) { + all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures); + } + + // run all threads + for (int i = 0; i < numThreads; i++) { + all[i].start(); + } + + // wait for all threads to finish + for (int i = 0; i < numThreads; i++) { + try { + all[i].join(); + } catch (InterruptedException e) { + } + } + assertEquals(0, failures.get()); + } + + + public static class AtomicOperation extends Thread { + private final HRegion region; + private final int numOps; + private final AtomicLong timeStamps; + private final AtomicInteger failures; + private final Random r = new Random(); + public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, AtomicInteger failures) { + this.region = region; + this.numOps = numOps; + this.timeStamps = timeStamps; + this.failures = failures; + } + @Override + public void run() { + boolean op = true; + for (int i=0; i * This operation does not appear atomic to readers. Appends are done Index: src/main/java/org/apache/hadoop/hbase/client/AtomicRowMutation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/AtomicRowMutation.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/AtomicRowMutation.java (revision 0) @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Performs multiple mutations atomically on a single row. + * Currently {@link Put} and {@link Delete} are supported. + * + * The mutations are in performed in the order in which they + * were added. + */ +public class AtomicRowMutation implements Row { + private List mutations = new ArrayList(); + protected byte [] row; + private static final byte VERSION = (byte)1; + + /** Constructor for Writable. DO NOT USE */ + public AtomicRowMutation() {} + + /** + * Create an atomic mutation for the specified row. + * @param row row key + */ + public AtomicRowMutation(byte [] row) { + this.row = Arrays.copyOf(row, row.length); + } + + /** + * Add a {@link Put} operation to the list of mutations + * @param p The {@link Put} to add + * @throws IOException + */ + public void add(Put p) throws IOException { + internalAdd(p); + } + + /** + * Add a {@link Delete} operation to the list of mutations + * @param d The {@link Delete} to add + * @throws IOException + */ + public void add(Delete d) throws IOException { + internalAdd(d); + } + + private void internalAdd(Mutation m) throws IOException { + int res = Bytes.compareTo(this.row, m.getRow()); + if(res != 0) { + throw new IOException("AtomicRowMutation only supports a single row"); + } + mutations.add(m); + } + + @Override + public void readFields(final DataInput in) throws IOException { + in.readByte(); // ignore version for now + this.row = Bytes.readByteArray(in); + int numMutations = in.readInt(); + mutations.clear(); + for(int i=0;i getMutations() { + return Collections.unmodifiableList(mutations); + } +} Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1230675) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -738,6 +738,20 @@ * {@inheritDoc} */ @Override + public void atomicMutation(final AtomicRowMutation arm) throws IOException { + new ServerCallable(connection, tableName, arm.getRow(), + operationTimeout) { + public Void call() throws IOException { + server.atomicMutation(location.getRegionInfo().getRegionName(), arm); + return null; + } + }.withRetries(); + } + + /** + * {@inheritDoc} + */ + @Override public Result append(final Append append) throws IOException { if (append.numFamilies() == 0) { throw new IOException( Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1230675) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -77,10 +77,12 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.AtomicRowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Row; @@ -1684,7 +1686,7 @@ try { // All edits for the given row (across all column families) must happen atomically. prepareDelete(delete); - internalDelete(delete, delete.getClusterId(), writeToWAL); + internalDelete(delete, delete.getClusterId(), writeToWAL, null); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1705,7 +1707,7 @@ delete.setFamilyMap(familyMap); delete.setClusterId(clusterId); delete.setWriteToWAL(writeToWAL); - internalDelete(delete, clusterId, writeToWAL); + internalDelete(delete, clusterId, writeToWAL, null); } /** @@ -1714,7 +1716,8 @@ * @throws IOException */ private void internalDelete(Delete delete, UUID clusterId, - boolean writeToWAL) throws IOException { + boolean writeToWAL, MultiVersionConsistencyControl.WriteEntry writeEntry) + throws IOException { Map> familyMap = delete.getFamilyMap(); WALEdit walEdit = new WALEdit(); /* Run coprocessor pre hook outside of locks to avoid deadlock */ @@ -1789,7 +1792,7 @@ } // Now make changes to the memstore. - long addedSize = applyFamilyMapToMemstore(familyMap, null); + long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry); flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); if (coprocessorHost != null) { @@ -1870,7 +1873,7 @@ try { // All edits for the given row (across all column families) must happen atomically. - internalPut(put, put.getClusterId(), writeToWAL); + internalPut(put, put.getClusterId(), writeToWAL, null); } finally { if(lockid == null) releaseRowLock(lid); } @@ -2299,11 +2302,11 @@ // originating cluster. A slave cluster receives the result as a Put // or Delete if (isPut) { - internalPut(((Put)w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL); + internalPut(((Put)w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL, null); } else { Delete d = (Delete)w; prepareDelete(d); - internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL); + internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL, null); } return true; } @@ -2398,7 +2401,7 @@ p.setFamilyMap(familyMap); p.setClusterId(HConstants.DEFAULT_CLUSTER_ID); p.setWriteToWAL(true); - this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true); + this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true, null); } /** @@ -2408,8 +2411,8 @@ * @param writeToWAL if true, then we should write to the log * @throws IOException */ - private void internalPut(Put put, UUID clusterId, - boolean writeToWAL) throws IOException { + private void internalPut(Put put, UUID clusterId, boolean writeToWAL, + MultiVersionConsistencyControl.WriteEntry writeEntry) throws IOException { Map> familyMap = put.getFamilyMap(); WALEdit walEdit = new WALEdit(); /* run pre put hook outside of lock to avoid deadlock */ @@ -2439,7 +2442,7 @@ walEdit, clusterId, now, this.htableDescriptor); } - long addedSize = applyFamilyMapToMemstore(familyMap, null); + long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry); flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); } finally { this.updatesLock.readLock().unlock(); @@ -4129,6 +4132,35 @@ return results; } + public int atomicMutation(AtomicRowMutation arm, + Integer lockid) throws IOException { + Integer lid = getLock(lockid, arm.getRow(), true); + this.updatesLock.readLock().lock(); + MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert(); + try { + int i = 0; + for (Mutation m : arm.getMutations()) { + if (m instanceof Put) { + internalPut((Put)m, HConstants.DEFAULT_CLUSTER_ID, m.getWriteToWAL(), w); + } else if (m instanceof Delete) { + Delete d = (Delete)m; + prepareDelete(d); + internalDelete((Delete)d, HConstants.DEFAULT_CLUSTER_ID, d.getWriteToWAL(), w); + } else { + throw new DoNotRetryIOException("Action must be Put or Delete"); + } + i++; + } + return i; + } finally { + mvcc.completeMemstoreInsert(w); + this.updatesLock.readLock().unlock(); + if (lid != null) { + releaseRowLock(lid); + } + } + } + // TODO: There's a lot of boiler plate code identical // to increment... See how to better unify that. /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1230675) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.catalog.RootLocationEditor; import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.AtomicRowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HConnectionManager; @@ -3152,6 +3153,27 @@ } @Override + public void atomicMutation(byte[] regionName, AtomicRowMutation arm) + throws IOException { + checkOpen(); + if (regionName == null) { + throw new IOException("Invalid arguments to atomicMutation " + + "regionName is null"); + } + requestCount.incrementAndGet(); + try { + HRegion region = getRegion(regionName); + if (!region.getRegionInfo().isMetaTable()) { + this.cacheFlusher.reclaimMemStoreMemory(); + } + region.atomicMutation(arm, null); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + @Override public Result append(byte[] regionName, Append append) throws IOException { checkOpen(); @@ -3296,6 +3318,9 @@ } else if (action instanceof Append) { response.add(regionName, originalIndex, append(regionName, (Append)action)); + } else if (action instanceof AtomicRowMutation) { + atomicMutation(regionName, (AtomicRowMutation)action); + response.add(regionName, originalIndex, new Result()); } else { LOG.debug("Error: invalid Action, row must be a Get, Delete, " + "Put, Exec, Increment, or Append."); Index: src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (revision 1230675) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (working copy) @@ -501,6 +501,11 @@ byte[] row) { return table.coprocessorProxy(protocol, row); } + + @Override + public void atomicMutation(AtomicRowMutation arm) throws IOException { + table.atomicMutation(arm); + } } /** The coprocessor */ Index: src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (revision 1230675) +++ src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (working copy) @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.AtomicRowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -236,6 +237,8 @@ addToMap(RegionOpeningState.class, code++); addToMap(Append.class, code++); + + addToMap(AtomicRowMutation.class, code++); } private Class declaredClass; Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1230675) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.AtomicRowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -262,6 +263,9 @@ byte [] family, byte [] qualifier, long amount, boolean writeToWAL) throws IOException; + public void atomicMutation(byte[] regionName, AtomicRowMutation arm) + throws IOException; + /** * Appends values to one or more columns values in a row. Optionally * Returns the updated keys after the append.