Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (revision 1231244) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java (working copy) @@ -19,6 +19,9 @@ import java.io.IOException; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -26,10 +29,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.RowMutation; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.junit.experimental.categories.Category; @@ -239,8 +245,99 @@ } } + /** + * Test multi-threaded increments. + */ + public void testRowMutationMultiThreads() throws IOException { + LOG.info("Starting test testMutationMultiThreads"); + initHRegion(tableName, getName(), fam1); + // create 100 threads, each will alternate between adding and + // removing a column + int numThreads = 100; + int opsPerThread = 1000; + AtomicOperation[] all = new AtomicOperation[numThreads]; + + AtomicLong timeStamps = new AtomicLong(0); + AtomicInteger failures = new AtomicInteger(0); + // create all threads + for (int i = 0; i < numThreads; i++) { + all[i] = new AtomicOperation(region, opsPerThread, timeStamps, failures); + } + + // run all threads + for (int i = 0; i < numThreads; i++) { + all[i].start(); + } + + // wait for all threads to finish + for (int i = 0; i < numThreads; i++) { + try { + all[i].join(); + } catch (InterruptedException e) { + } + } + assertEquals(0, failures.get()); + } + + + public static class AtomicOperation extends Thread { + private final HRegion region; + private final int numOps; + private final AtomicLong timeStamps; + private final AtomicInteger failures; + private final Random r = new Random(); + public AtomicOperation(HRegion region, int numOps, AtomicLong timeStamps, AtomicInteger failures) { + this.region = region; + this.numOps = numOps; + this.timeStamps = timeStamps; + this.failures = failures; + } + @Override + public void run() { + boolean op = true; + for (int i=0; i * This operation does not appear atomic to readers. Appends are done Index: src/main/java/org/apache/hadoop/hbase/client/RowMutation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/RowMutation.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/RowMutation.java (revision 0) @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Performs multiple mutations atomically on a single row. + * Currently {@link Put} and {@link Delete} are supported. + * + * The mutations are performed in the order in which they + * were added. + */ +public class RowMutation implements Row { + private List mutations = new ArrayList(); + private byte [] row; + private static final byte VERSION = (byte)0; + + /** Constructor for Writable. DO NOT USE */ + public RowMutation() {} + + /** + * Create an atomic mutation for the specified row. + * @param row row key + */ + public RowMutation(byte [] row) { + if(row == null || row.length > HConstants.MAX_ROW_LENGTH) { + throw new IllegalArgumentException("Row key is invalid"); + } + this.row = Arrays.copyOf(row, row.length); + } + + /** + * Add a {@link Put} operation to the list of mutations + * @param p The {@link Put} to add + * @throws IOException + */ + public void add(Put p) throws IOException { + internalAdd(p); + } + + /** + * Add a {@link Delete} operation to the list of mutations + * @param d The {@link Delete} to add + * @throws IOException + */ + public void add(Delete d) throws IOException { + internalAdd(d); + } + + private void internalAdd(Mutation m) throws IOException { + int res = Bytes.compareTo(this.row, m.getRow()); + if(res != 0) { + throw new IOException("The row in the recently added Put/Delete " + + Bytes.toStringBinary(m.getRow()) + " doesn't match the original one " + + Bytes.toStringBinary(this.row)); + } + mutations.add(m); + } + + @Override + public void readFields(final DataInput in) throws IOException { + int version = in.readByte(); + if (version > VERSION) { + throw new IOException("version not supported"); + } + this.row = Bytes.readByteArray(in); + int numMutations = in.readInt(); + mutations.clear(); + for(int i = 0; i < numMutations; i++) { + mutations.add((Mutation) HbaseObjectWritable.readObject(in, null)); + } + } + + @Override + public void write(final DataOutput out) throws IOException { + out.writeByte(VERSION); + Bytes.writeByteArray(out, this.row); + out.writeInt(mutations.size()); + for (Mutation m : mutations) { + HbaseObjectWritable.writeObject(out, m, m.getClass(), null); + } + } + + @Override + public int compareTo(Row i) { + return Bytes.compareTo(this.getRow(), i.getRow()); + } + + @Override + public byte[] getRow() { + return row; + } + + /** + * @return An unmodifiable list of the current mutations. + */ + public List getMutations() { + return Collections.unmodifiableList(mutations); + } +} Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1231244) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -752,6 +752,20 @@ * {@inheritDoc} */ @Override + public void mutateRow(final RowMutation rm) throws IOException { + new ServerCallable(connection, tableName, rm.getRow(), + operationTimeout) { + public Void call() throws IOException { + server.mutateRow(location.getRegionInfo().getRegionName(), rm); + return null; + } + }.withRetries(); + } + + /** + * {@inheritDoc} + */ + @Override public Result append(final Append append) throws IOException { if (append.numFamilies() == 0) { throw new IOException( Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1231244) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -77,10 +77,12 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Row; @@ -1684,7 +1686,7 @@ try { // All edits for the given row (across all column families) must happen atomically. prepareDelete(delete); - internalDelete(delete, delete.getClusterId(), writeToWAL); + internalDelete(delete, delete.getClusterId(), writeToWAL, null, null); } finally { if(lockid == null) releaseRowLock(lid); } @@ -1705,21 +1707,26 @@ delete.setFamilyMap(familyMap); delete.setClusterId(clusterId); delete.setWriteToWAL(writeToWAL); - internalDelete(delete, clusterId, writeToWAL); + internalDelete(delete, clusterId, writeToWAL, null, null); } /** + * @param delete The Delete command * @param familyMap map of family to edits for the given family. * @param writeToWAL + * @param writeEntry Optional mvcc write point to use + * @param walEdit Optional walEdit to use. A non-null walEdit indicates + * that the coprocessor hooks are run by the caller * @throws IOException */ private void internalDelete(Delete delete, UUID clusterId, - boolean writeToWAL) throws IOException { + boolean writeToWAL, MultiVersionConsistencyControl.WriteEntry writeEntry, + WALEdit walEdit) throws IOException { Map> familyMap = delete.getFamilyMap(); - WALEdit walEdit = new WALEdit(); + WALEdit localWalEdit = walEdit == null ? new WALEdit() : walEdit; /* Run coprocessor pre hook outside of locks to avoid deadlock */ - if (coprocessorHost != null) { - if (coprocessorHost.preDelete(delete, walEdit, writeToWAL)) { + if (coprocessorHost != null && walEdit == null) { + if (coprocessorHost.preDelete(delete, localWalEdit, writeToWAL)) { return; } } @@ -1783,23 +1790,22 @@ // // bunch up all edits across all column families into a // single WALEdit. - addFamilyMapToWALEdit(familyMap, walEdit); + addFamilyMapToWALEdit(familyMap, localWalEdit); this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, clusterId, now, this.htableDescriptor); + localWalEdit, clusterId, now, this.htableDescriptor); } // Now make changes to the memstore. - long addedSize = applyFamilyMapToMemstore(familyMap, null); + long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry); flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); - if (coprocessorHost != null) { - coprocessorHost.postDelete(delete, walEdit, writeToWAL); - } } finally { this.updatesLock.readLock().unlock(); } - // do after lock + if (coprocessorHost != null && walEdit == null) { + coprocessorHost.postDelete(delete, localWalEdit, writeToWAL); + } final long after = EnvironmentEdgeManager.currentTimeMillis(); final String metricPrefix = SchemaMetrics.generateSchemaMetricsPrefix( getTableDesc().getNameAsString(), familyMap.keySet()); @@ -1870,7 +1876,7 @@ try { // All edits for the given row (across all column families) must happen atomically. - internalPut(put, put.getClusterId(), writeToWAL); + internalPut(put, put.getClusterId(), writeToWAL, null, null); } finally { if(lockid == null) releaseRowLock(lid); } @@ -2299,11 +2305,13 @@ // originating cluster. A slave cluster receives the result as a Put // or Delete if (isPut) { - internalPut(((Put)w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL); + internalPut(((Put) w), HConstants.DEFAULT_CLUSTER_ID, writeToWAL, + null, null); } else { Delete d = (Delete)w; prepareDelete(d); - internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL); + internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, writeToWAL, null, + null); } return true; } @@ -2398,7 +2406,7 @@ p.setFamilyMap(familyMap); p.setClusterId(HConstants.DEFAULT_CLUSTER_ID); p.setWriteToWAL(true); - this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true); + this.internalPut(p, HConstants.DEFAULT_CLUSTER_ID, true, null, null); } /** @@ -2406,15 +2414,18 @@ * Warning: Assumption is caller has lock on passed in row. * @param put The Put command * @param writeToWAL if true, then we should write to the log + * @param writeEntry Optional mvcc write point to use + * @param walEdit Optional walEdit to use. A non-null walEdit indicates + * that the coprocessor hooks are run by the caller * @throws IOException */ - private void internalPut(Put put, UUID clusterId, - boolean writeToWAL) throws IOException { + private void internalPut(Put put, UUID clusterId, boolean writeToWAL, + MultiVersionConsistencyControl.WriteEntry writeEntry, WALEdit walEdit) throws IOException { Map> familyMap = put.getFamilyMap(); - WALEdit walEdit = new WALEdit(); + WALEdit localWalEdit = walEdit == null ? new WALEdit() : walEdit; /* run pre put hook outside of lock to avoid deadlock */ - if (coprocessorHost != null) { - if (coprocessorHost.prePut(put, walEdit, writeToWAL)) { + if (coprocessorHost != null && walEdit == null) { + if (coprocessorHost.prePut(put, localWalEdit, writeToWAL)) { return; } } @@ -2434,19 +2445,19 @@ // for some reason fail to write/sync to commit log, the memstore // will contain uncommitted transactions. if (writeToWAL) { - addFamilyMapToWALEdit(familyMap, walEdit); + addFamilyMapToWALEdit(familyMap, localWalEdit); this.log.append(regionInfo, this.htableDescriptor.getName(), - walEdit, clusterId, now, this.htableDescriptor); + localWalEdit, clusterId, now, this.htableDescriptor); } - long addedSize = applyFamilyMapToMemstore(familyMap, null); + long addedSize = applyFamilyMapToMemstore(familyMap, writeEntry); flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); } finally { this.updatesLock.readLock().unlock(); } - if (coprocessorHost != null) { - coprocessorHost.postPut(put, walEdit, writeToWAL); + if (coprocessorHost != null && walEdit == null) { + coprocessorHost.postPut(put, localWalEdit, writeToWAL); } // do after lock @@ -4129,6 +4140,95 @@ return results; } + public int mutateRow(RowMutation rm, + Integer lockid) throws IOException { + + startRegionOperation(); + List walEdits = new ArrayList(rm.getMutations().size()); + + // 1. run all pre-hooks before the atomic operation + // if any pre hook indicates "bypass", bypass the entire operation + // Note that this requires creating the WALEdits here and passing + // them to the actual Put/Delete operations. + for (Mutation m : rm.getMutations()) { + WALEdit walEdit = new WALEdit(); + walEdits.add(walEdit); + if (coprocessorHost == null) { + continue; + } + if (m instanceof Put) { + if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) { + // by pass everything + return 0; + } + } else if (m instanceof Delete) { + Delete d = (Delete) m; + prepareDelete(d); + if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) { + // by pass everything + return 0; + } + } + } + + // 2. acquire the row lock + Integer lid = getLock(lockid, rm.getRow(), true); + + // 3. acquire the region lock + this.updatesLock.readLock().lock(); + + // 4. Get a mvcc write number + MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert(); + try { + int i = 0; + // 5. Perform the actual mutations + for (Mutation m : rm.getMutations()) { + if (m instanceof Put) { + internalPut((Put) m, HConstants.DEFAULT_CLUSTER_ID, + m.getWriteToWAL(), w, walEdits.get(i)); + } else if (m instanceof Delete) { + Delete d = (Delete) m; + prepareDelete(d); + internalDelete(d, HConstants.DEFAULT_CLUSTER_ID, d.getWriteToWAL(), + w, walEdits.get(i)); + } else { + throw new DoNotRetryIOException( + "Action must be Put or Delete. But was: " + + m.getClass().getName()); + } + i++; + } + return i; + } finally { + // 6. roll mvcc forward + mvcc.completeMemstoreInsert(w); + // 7. release region lock + this.updatesLock.readLock().unlock(); + try { + // 8. run all coprocessor post hooks + if (coprocessorHost != null) { + int i = 0; + for (Mutation m : rm.getMutations()) { + if (m instanceof Put) { + coprocessorHost.postPut((Put) m, walEdits.get(i), + m.getWriteToWAL()); + } else if (m instanceof Delete) { + coprocessorHost.postDelete((Delete) m, walEdits.get(i), + m.getWriteToWAL()); + } + i++; + } + } + } finally { + if (lid != null) { + // 9. release the row lock + releaseRowLock(lid); + } + closeRegionOperation(); + } + } + } + // TODO: There's a lot of boiler plate code identical // to increment... See how to better unify that. /** Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1231244) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.catalog.RootLocationEditor; import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HConnectionManager; @@ -3152,6 +3153,27 @@ } @Override + public void mutateRow(byte[] regionName, RowMutation rm) + throws IOException { + checkOpen(); + if (regionName == null) { + throw new IOException("Invalid arguments to atomicMutation " + + "regionName is null"); + } + requestCount.incrementAndGet(); + try { + HRegion region = getRegion(regionName); + if (!region.getRegionInfo().isMetaTable()) { + this.cacheFlusher.reclaimMemStoreMemory(); + } + region.mutateRow(rm, null); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + @Override public Result append(byte[] regionName, Append append) throws IOException { checkOpen(); @@ -3296,6 +3318,9 @@ } else if (action instanceof Append) { response.add(regionName, originalIndex, append(regionName, (Append)action)); + } else if (action instanceof RowMutation) { + mutateRow(regionName, (RowMutation)action); + response.add(regionName, originalIndex, new Result()); } else { LOG.debug("Error: invalid Action, row must be a Get, Delete, " + "Put, Exec, Increment, or Append."); Index: src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (revision 1231244) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (working copy) @@ -501,6 +501,11 @@ byte[] row) { return table.coprocessorProxy(protocol, row); } + + @Override + public void mutateRow(RowMutation rm) throws IOException { + table.mutateRow(rm); + } } /** The coprocessor */ Index: src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (revision 1231244) +++ src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (working copy) @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -232,10 +233,12 @@ addToMap(ColumnRangeFilter.class, code++); addToMap(HServerLoad.class, code++); - + addToMap(RegionOpeningState.class, code++); addToMap(Append.class, code++); + + addToMap(RowMutation.class, code++); } private Class declaredClass; Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1231244) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.RowMutation; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -262,6 +263,9 @@ byte [] family, byte [] qualifier, long amount, boolean writeToWAL) throws IOException; + public void mutateRow(byte[] regionName, RowMutation rm) + throws IOException; + /** * Appends values to one or more columns values in a row. Optionally * Returns the updated keys after the append.