Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (revision 1232990) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (working copy) @@ -19,6 +19,8 @@ import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -29,11 +31,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.RegionMutation; import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; @@ -246,7 +250,7 @@ } /** - * Test multi-threaded increments. + * Test multi-threaded row mutations. */ public void testRowMutationMultiThreads() throws IOException { @@ -263,7 +267,52 @@ AtomicInteger failures = new AtomicInteger(0); // create all threads for (int i = 0; i < numThreads; i++) { - all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures); + all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures) { + @Override + public void run() { + boolean op = true; + for (int i=0; i r = new ArrayList(); + while(rs.next(r)); + if (r.size() != 1) { + LOG.debug(r); + failures.incrementAndGet(); + fail(); + } + } catch (IOException e) { + e.printStackTrace(); + failures.incrementAndGet(); + fail(); + } + } + } + }; + } + + // run all threads + for (int i = 0; i < numThreads; i++) { + all[i].start(); + } + + // wait for all threads to finish + for (int i = 0; i < numThreads; i++) { + try { + all[i].join(); + } catch (InterruptedException e) { + } + } + assertEquals(0, failures.get()); + } + public static class AtomicOperation extends Thread { - private final HRegion region; - private final int numOps; - private final AtomicLong timeStamps; - private final AtomicInteger failures; - private final Random r = new Random(); - public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, AtomicInteger failures) { + protected final HRegion region; + protected final int numOps; + protected final AtomicLong timeStamps; + protected final AtomicInteger failures; + protected final Random r = new Random(); + + public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, + AtomicInteger failures) { this.region = region; this.numOps = numOps; this.timeStamps = timeStamps; this.failures = failures; } - @Override - public void run() { - boolean op = true; - for (int i=0; i * This operation does not appear atomic to readers. Appends are done Index: src/main/java/org/apache/hadoop/hbase/client/RowMutation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/RowMutation.java (revision 1232990) +++ src/main/java/org/apache/hadoop/hbase/client/RowMutation.java (working copy) @@ -20,13 +20,9 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.List; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.io.HbaseObjectWritable; import org.apache.hadoop.hbase.util.Bytes; /** @@ -36,51 +32,36 @@ * The mutations are performed in the order in which they * were added. */ -public class RowMutation implements Row { - private List mutations = new ArrayList(); +public class RowMutation extends RegionMutation implements Row { private byte [] row; private static final byte VERSION = (byte)0; /** Constructor for Writable. DO NOT USE */ - public RowMutation() {} + public RowMutation() { + super(); + } /** * Create an atomic mutation for the specified row. * @param row row key */ public RowMutation(byte [] row) { + super(); if(row == null || row.length > HConstants.MAX_ROW_LENGTH) { throw new IllegalArgumentException("Row key is invalid"); } this.row = Arrays.copyOf(row, row.length); } - /** - * Add a {@link Put} operation to the list of mutations - * @param p The {@link Put} to add - * @throws IOException - */ - public void add(Put p) throws IOException { - internalAdd(p); - } - - /** - * Add a {@link Delete} operation to the list of mutations - * @param d The {@link Delete} to add - * @throws IOException - */ - public void add(Delete d) throws IOException { - internalAdd(d); - } - - private void internalAdd(Mutation m) throws IOException { + @Override + protected void internalAdd(Mutation m) throws IOException { int res = Bytes.compareTo(this.row, m.getRow()); if(res != 0) { throw new IOException("The row in the recently added Put/Delete " + Bytes.toStringBinary(m.getRow()) + " doesn't match the original one " + Bytes.toStringBinary(this.row)); } - mutations.add(m); + super.internalAdd(m); } @Override @@ -90,21 +71,14 @@ throw new IOException("version not supported"); } this.row = Bytes.readByteArray(in); - int numMutations = in.readInt(); - mutations.clear(); - for(int i = 0; i < numMutations; i++) { - mutations.add((Mutation) HbaseObjectWritable.readObject(in, null)); - } + super.readFields(in); } @Override public void write(final DataOutput out) throws IOException { out.writeByte(VERSION); Bytes.writeByteArray(out, this.row); - out.writeInt(mutations.size()); - for (Mutation m : mutations) { - HbaseObjectWritable.writeObject(out, m, m.getClass(), null); - } + super.write(out); } @Override @@ -116,11 +90,4 @@ public byte[] getRow() { return row; } - - /** - * @return An unmodifiable list of the current mutations. - */ - public List getMutations() { - return Collections.unmodifiableList(mutations); - } } Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1232990) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -766,6 +766,26 @@ * {@inheritDoc} */ @Override + public void mutateRegion(final RegionMutation rm) throws IOException { + if (rm.getMutations().size() == 0) { + throw new IOException("Invalid arguments to mutateRegion, " + + "at least one mutation must be specified"); + } + new ServerCallable(connection, + tableName, + rm.getMutations().get(0).getRow(), + operationTimeout) { + public Void call() throws IOException { + server.mutateRegion(location.getRegionInfo().getRegionName(), rm); + return null; + } + }.withRetries(); + } + + /** + * {@inheritDoc} + */ + @Override public Result append(final Append append) throws IOException { if (append.numFamilies() == 0) { throw new IOException( Index: src/main/java/org/apache/hadoop/hbase/client/RegionMutation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/RegionMutation.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/RegionMutation.java (revision 0) @@ -0,0 +1,78 @@ +package org.apache.hadoop.hbase.client; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.io.Writable; + +/** + * For advanced use only. + * Performs multiple mutations atomically within a single + * region. + * The caller has to ensure that all rows involved reside on the same region. + * + * Currently {@link Put} and {@link Delete} are supported. + */ +public class RegionMutation implements Writable { + protected List mutations = new ArrayList(); + private static final byte VERSION = (byte)0; + + public RegionMutation() {} + /** + * Add a {@link Put} operation to the list of mutations + * @param p The {@link Put} to add + * @throws IOException + */ + public void add(Put p) throws IOException { + internalAdd(p); + } + + /** + * Add a {@link Delete} operation to the list of mutations + * @param d The {@link Delete} to add + * @throws IOException + */ + public void add(Delete d) throws IOException { + internalAdd(d); + } + + protected void internalAdd(Mutation m) throws IOException { + // TODO: check single region condition on the client? + // (check on the server is cheap) + mutations.add(m); + } + + @Override + public void readFields(final DataInput in) throws IOException { + int version = in.readByte(); + if (version > VERSION) { + throw new IOException("version not supported"); + } + int numMutations = in.readInt(); + mutations.clear(); + for(int i = 0; i < numMutations; i++) { + mutations.add((Mutation) HbaseObjectWritable.readObject(in, null)); + } + } + + @Override + public void write(final DataOutput out) throws IOException { + out.writeByte(VERSION); + out.writeInt(mutations.size()); + for (Mutation m : mutations) { + HbaseObjectWritable.writeObject(out, m, m.getClass(), null); + } + } + + /** + * @return An unmodifiable list of the current mutations. + */ + public List getMutations() { + return Collections.unmodifiableList(mutations); + } +} Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1232990) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -38,7 +38,11 @@ import java.util.NavigableMap; import java.util.NavigableSet; import java.util.Random; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; @@ -77,6 +81,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.RegionMutation; import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -4144,12 +4149,41 @@ return results; } - public void mutateRow(RowMutation rm, - Integer lockid) throws IOException { + public void mutateRow(RowMutation rm, Integer lockid) throws IOException { + mutate(rm.getMutations(), Collections.singleton(rm.getRow()), lockid); + } + + public void mutateRegion(RegionMutation rm) + throws IOException { + // sort the rows, to avoid deadlocks + SortedSet rowsToLock = new TreeSet(Bytes.BYTES_COMPARATOR); + for (Mutation m : rm.getMutations()) { + if (!rowIsInRange(regionInfo, m.getRow())) { + throw new DoNotRetryIOException("Requested row out of range for " + + "mutateRegion on HRegion " + this + ", startKey='" + + Bytes.toStringBinary(regionInfo.getStartKey()) + "', getEndKey()='" + + Bytes.toStringBinary(regionInfo.getEndKey()) + "', row='" + + Bytes.toStringBinary(m.getRow()) + "'"); + } + rowsToLock.add(m.getRow()); + } + mutate(rm.getMutations(), rowsToLock, null); + } + + /** + * Perform atomic mutations within the regions. + * @param mutations The list of mutations to perform. + * @param rowsToLock Rows to lock + * @param lockId Optionally use this single lock + * @throws IOException + */ + private void mutate(Collection mutations, + Set rowsToLock, Integer lockId) throws IOException { boolean flush = false; + assert rowsToLock.size() == 1 || lockId == null; startRegionOperation(); - Integer lid = null; + List acquiredLocks = null; try { // 1. run all pre-hooks before the atomic operation // if any pre hook indicates "bypass", bypass the entire operation @@ -4157,7 +4191,7 @@ // one WALEdit is used for all edits. WALEdit walEdit = new WALEdit(); if (coprocessorHost != null) { - for (Mutation m : rm.getMutations()) { + for (Mutation m : mutations) { if (m instanceof Put) { if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) { // by pass everything @@ -4174,8 +4208,16 @@ } } - // 2. acquire the row lock - lid = getLock(lockid, rm.getRow(), true); + // 2. acquire the row lock(s) + acquiredLocks = Lists.newArrayListWithCapacity(mutations.size()); + for (byte[] row : rowsToLock) { + Integer lid = getLock(lockId, row, true); + if (lid == null) { + throw new IOException("Failed to acquire lock on " + + Bytes.toStringBinary(row)); + } + acquiredLocks.add(lid); + } // 3. acquire the region lock this.updatesLock.readLock().lock(); @@ -4187,7 +4229,7 @@ byte[] byteNow = Bytes.toBytes(now); try { // 5. Check mutations and apply edits to a single WALEdit - for (Mutation m : rm.getMutations()) { + for (Mutation m : mutations) { if (m instanceof Put) { Map> familyMap = m.getFamilyMap(); checkFamilies(familyMap.keySet()); @@ -4214,7 +4256,7 @@ // 7. apply to memstore long addedSize = 0; - for (Mutation m : rm.getMutations()) { + for (Mutation m : mutations) { addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w); } flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); @@ -4227,7 +4269,7 @@ } // 10. run all coprocessor post hooks, after region lock is released if (coprocessorHost != null) { - for (Mutation m : rm.getMutations()) { + for (Mutation m : mutations) { if (m instanceof Put) { coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL()); } else if (m instanceof Delete) { @@ -4236,9 +4278,11 @@ } } } finally { - if (lid != null) { + if (acquiredLocks != null) { // 11. release the row lock - releaseRowLock(lid); + for (Integer lid : acquiredLocks) { + releaseRowLock(lid); + } } if (flush) { // 12. Flush cache if needed. Do it outside update lock. Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1232990) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.catalog.RootLocationEditor; import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.RegionMutation; import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -3153,11 +3154,32 @@ } @Override + public void mutateRegion(byte[] regionName, RegionMutation rm) + throws IOException { + checkOpen(); + if (regionName == null) { + throw new IOException("Invalid arguments to mutateRegion " + + "regionName is null"); + } + requestCount.incrementAndGet(); + try { + HRegion region = getRegion(regionName); + if (!region.getRegionInfo().isMetaTable()) { + this.cacheFlusher.reclaimMemStoreMemory(); + } + region.mutateRegion(rm); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + @Override public void mutateRow(byte[] regionName, RowMutation rm) throws IOException { checkOpen(); if (regionName == null) { - throw new IOException("Invalid arguments to atomicMutation " + + throw new IOException("Invalid arguments to mutateRow " + "regionName is null"); } requestCount.incrementAndGet(); Index: src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (revision 1232990) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (working copy) @@ -506,6 +506,11 @@ public void mutateRow(RowMutation rm) throws IOException { table.mutateRow(rm); } + + @Override + public void mutateRegion(RegionMutation rm) throws IOException { + table.mutateRegion(rm); + } } /** The coprocessor */ Index: src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (revision 1232990) +++ src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (working copy) @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.RegionMutation; import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -248,6 +249,7 @@ addToMap(Append.class, code++); addToMap(RowMutation.class, code++); + addToMap(RegionMutation.class, code++); } private Class declaredClass; Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1232990) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.RegionMutation; import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -266,6 +267,9 @@ public void mutateRow(byte[] regionName, RowMutation rm) throws IOException; + public void mutateRegion(byte[] regionName, RegionMutation rm) + throws IOException; + /** * Appends values to one or more columns values in a row. Optionally * Returns the updated keys after the append.