Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (revision 1239953) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (working copy) @@ -19,6 +19,8 @@ import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -29,11 +31,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.MultiRowMutation; import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; @@ -246,11 +250,11 @@ } /** - * Test multi-threaded increments. + * Test multi-threaded row mutations. */ public void testRowMutationMultiThreads() throws IOException { - LOG.info("Starting test testMutationMultiThreads"); + LOG.info("Starting test testRowMutationMultiThreads"); initHRegion(tableName, getName(), fam1); // create 100 threads, each will alternate between adding and @@ -263,7 +267,52 @@ AtomicInteger failures = new AtomicInteger(0); // create all threads for (int i = 0; i < numThreads; i++) { - all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures); + all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { + @Override + public void run() { + boolean op = true; + for (int i=0; i r = new ArrayList(); + while(rs.next(r)); + if (r.size() != 1) { + LOG.debug(r); + failures.incrementAndGet(); + fail(); + } + } catch (IOException e) { + e.printStackTrace(); + failures.incrementAndGet(); + fail(); + } + } + } + }; + } + + // run all threads + for (int i = 0; i < numThreads; i++) { + all[i].start(); + } + + // wait for all threads to finish + for (int i = 0; i < numThreads; i++) { + try { + all[i].join(); + } catch (InterruptedException e) { + } + } + assertEquals(0, failures.get()); + } + public static class AtomicOperation extends Thread { - private final HRegion region; - private final int numOps; - private final AtomicLong timeStamps; - private final AtomicInteger failures; - private final Random r = new Random(); - public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, AtomicInteger failures) { + protected final HRegion region; + protected final int numOps; + protected final AtomicLong timeStamps; + protected final AtomicInteger failures; + protected final Random r = new Random(); + + public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, + AtomicInteger failures) { this.region = region; this.numOps = numOps; this.timeStamps = timeStamps; this.failures = failures; } - @Override - public void run() { - boolean op = true; - for (int i=0; i mutations = new ArrayList(); + private static final byte VERSION = (byte)0; + + public MultiRowMutation() {} + /** + * Add a {@link Put} operation to the list of mutations + * @param p The {@link Put} to add + * @throws IOException + */ + public void add(Put p) throws IOException { + internalAdd(p); + } + + /** + * Add a {@link Delete} operation to the list of mutations + * @param d The {@link Delete} to add + * @throws IOException + */ + public void add(Delete d) throws IOException { + internalAdd(d); + } + + protected void internalAdd(Mutation m) throws IOException { + mutations.add(m); + } + + @Override + public void readFields(final DataInput in) throws IOException { + int version = in.readByte(); + if (version > VERSION) { + throw new IOException("version not supported"); + } + int numMutations = in.readInt(); + mutations.clear(); + for(int i = 0; i < numMutations; i++) { + mutations.add((Mutation) HbaseObjectWritable.readObject(in, null)); + } + } + + @Override + public void write(final DataOutput out) throws IOException { + out.writeByte(VERSION); + out.writeInt(mutations.size()); + for (Mutation m : mutations) { + HbaseObjectWritable.writeObject(out, m, m.getClass(), null); + } + } + + /** + * @return An unmodifiable list of the current mutations. + */ + public List getMutations() { + return Collections.unmodifiableList(mutations); + } +} Index: src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (revision 1239953) +++ src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (working copy) @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy; import java.util.Map; @@ -275,6 +276,21 @@ public void mutateRow(final RowMutation rm) throws IOException; /** + * Advanced use only. + * Performs multiple mutations atomically across multiple rows. Currently + * {@link Put} and {@link Delete} are supported. + * + * NOTE: The caller has to ensure that all rows reside on the same region. + * Do not use unless splitting is disabled or a custom {@link RegionSplitPolicy} + * has been setup for the involved table. + * + * @param mrm object that specifies the set of mutations to perform + * atomically + * @throws IOException if any of the mutations reside in a different region. + */ + public void mutateRows(final MultiRowMutation mrm) throws IOException; + + /** * Appends values to one or more columns within a single row. *

* This operation does not appear atomic to readers. Appends are done Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1239953) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -766,6 +766,26 @@ * {@inheritDoc} */ @Override + public void mutateRows(final MultiRowMutation mrm) throws IOException { + if (mrm.getMutations().size() == 0) { + throw new IOException("Invalid arguments to mutateRows, " + + "at least one mutation must be specified"); + } + new ServerCallable(connection, + tableName, + mrm.getMutations().get(0).getRow(), + operationTimeout) { + public Void call() throws IOException { + server.mutateRows(location.getRegionInfo().getRegionName(), mrm); + return null; + } + }.withRetries(); + } + + /** + * {@inheritDoc} + */ + @Override public Result append(final Append append) throws IOException { if (append.numFamilies() == 0) { throw new IOException( Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1239953) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -38,7 +38,10 @@ import java.util.NavigableMap; import java.util.NavigableSet; import java.util.Random; +import java.util.Set; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; @@ -77,6 +80,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.MultiRowMutation; import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -4144,12 +4148,41 @@ return results; } - public void mutateRow(RowMutation rm, - Integer lockid) throws IOException { + public void mutateRow(RowMutation rm, Integer lockid) throws IOException { + internalMutate(rm.getMutations(), Collections.singleton(rm.getRow()), lockid); + } + + public void mutateRows(MultiRowMutation mrm) + throws IOException { + // sort the rows, to avoid deadlocks + SortedSet rowsToLock = new TreeSet(Bytes.BYTES_COMPARATOR); + for (Mutation m : mrm.getMutations()) { + if (!rowIsInRange(regionInfo, m.getRow())) { + throw new DoNotRetryIOException("Requested row out of range for " + + "mutateRows on HRegion " + this + ", startKey='" + + Bytes.toStringBinary(regionInfo.getStartKey()) + "', getEndKey()='" + + Bytes.toStringBinary(regionInfo.getEndKey()) + "', row='" + + Bytes.toStringBinary(m.getRow()) + "'"); + } + rowsToLock.add(m.getRow()); + } + internalMutate(mrm.getMutations(), rowsToLock, null); + } + + /** + * Perform atomic mutations within the region. + * @param mutations The list of mutations to perform. + * @param rowsToLock Rows to lock + * @param lockId Optionally use this single lock + * @throws IOException + */ + private void internalMutate(Collection mutations, + Set rowsToLock, Integer lockId) throws IOException { boolean flush = false; + assert rowsToLock.size() == 1 || lockId == null; startRegionOperation(); - Integer lid = null; + List acquiredLocks = null; try { // 1. run all pre-hooks before the atomic operation // if any pre hook indicates "bypass", bypass the entire operation @@ -4157,7 +4190,7 @@ // one WALEdit is used for all edits. WALEdit walEdit = new WALEdit(); if (coprocessorHost != null) { - for (Mutation m : rm.getMutations()) { + for (Mutation m : mutations) { if (m instanceof Put) { if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) { // by pass everything @@ -4174,8 +4207,17 @@ } } - // 2. acquire the row lock - lid = getLock(lockid, rm.getRow(), true); + // 2. acquire the row lock(s) + acquiredLocks = new ArrayList(rowsToLock.size()); + for (byte[] row : rowsToLock) { + // attempt to lock all involved rows, fail if one lock times out + Integer lid = getLock(lockId, row, true); + if (lid == null) { + throw new IOException("Failed to acquire lock on " + + Bytes.toStringBinary(row)); + } + acquiredLocks.add(lid); + } // 3. acquire the region lock this.updatesLock.readLock().lock(); @@ -4187,7 +4229,7 @@ byte[] byteNow = Bytes.toBytes(now); try { // 5. Check mutations and apply edits to a single WALEdit - for (Mutation m : rm.getMutations()) { + for (Mutation m : mutations) { if (m instanceof Put) { Map> familyMap = m.getFamilyMap(); checkFamilies(familyMap.keySet()); @@ -4214,7 +4256,7 @@ // 7. apply to memstore long addedSize = 0; - for (Mutation m : rm.getMutations()) { + for (Mutation m : mutations) { addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w); } flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); @@ -4227,7 +4269,7 @@ } // 10. run all coprocessor post hooks, after region lock is released if (coprocessorHost != null) { - for (Mutation m : rm.getMutations()) { + for (Mutation m : mutations) { if (m instanceof Put) { coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL()); } else if (m instanceof Delete) { @@ -4236,9 +4278,11 @@ } } } finally { - if (lid != null) { + if (acquiredLocks != null) { // 11. release the row lock - releaseRowLock(lid); + for (Integer lid : acquiredLocks) { + releaseRowLock(lid); + } } if (flush) { // 12. Flush cache if needed. Do it outside update lock. Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1239953) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.catalog.RootLocationEditor; import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.MultiRowMutation; import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -3153,11 +3154,32 @@ } @Override + public void mutateRows(byte[] regionName, MultiRowMutation rm) + throws IOException { + checkOpen(); + if (regionName == null) { + throw new IOException("Invalid arguments to mutateRows " + + "regionName is null"); + } + requestCount.incrementAndGet(); + try { + HRegion region = getRegion(regionName); + if (!region.getRegionInfo().isMetaTable()) { + this.cacheFlusher.reclaimMemStoreMemory(); + } + region.mutateRows(rm); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + @Override public void mutateRow(byte[] regionName, RowMutation rm) throws IOException { checkOpen(); if (regionName == null) { - throw new IOException("Invalid arguments to atomicMutation " + + throw new IOException("Invalid arguments to mutateRow " + "regionName is null"); } requestCount.incrementAndGet(); Index: src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (revision 1239953) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (working copy) @@ -506,6 +506,11 @@ public void mutateRow(RowMutation rm) throws IOException { table.mutateRow(rm); } + + @Override + public void mutateRows(MultiRowMutation rm) throws IOException { + table.mutateRows(rm); + } } /** The coprocessor */ Index: src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (revision 1239953) +++ src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (working copy) @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.MultiRowMutation; import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -248,6 +249,7 @@ addToMap(Append.class, code++); addToMap(RowMutation.class, code++); + addToMap(MultiRowMutation.class, code++); } private Class declaredClass; Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1239953) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.MultiRowMutation; import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -266,6 +267,9 @@ public void mutateRow(byte[] regionName, RowMutation rm) throws IOException; + public void mutateRows(byte[] regionName, MultiRowMutation mrm) + throws IOException; + /** * Appends values to one or more columns values in a row. Optionally * Returns the updated keys after the append.