Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java (revision 1180702) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestIncrement.java (working copy) @@ -1,265 +0,0 @@ -/** - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestCase; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HDFSBlocksDistribution; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.HConstants.OperationStatusCode; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.MultithreadedTestUtil; -import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.filter.BinaryComparator; -import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.filter.NullComparator; -import org.apache.hadoop.hbase.filter.PrefixFilter; -import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; -import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; -import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.PairOfSameType; -import org.apache.hadoop.hbase.util.Threads; -import org.junit.Test; - -import com.google.common.collect.Lists; - - -/** - * Testing of HRegion.incrementColumnValue - * - */ -public class TestIncrement extends HBaseTestCase { - static final Log LOG = LogFactory.getLog(TestIncrement.class); - - HRegion region = null; - private final String DIR = HBaseTestingUtility.getTestDir() + - "/TestIncrement/"; - - private final int MAX_VERSIONS = 2; - - // Test names - static final byte[] tableName = Bytes.toBytes("testtable");; - static final byte[] qual1 = Bytes.toBytes("qual1"); - static final byte[] qual2 = Bytes.toBytes("qual2"); - static final byte[] qual3 = Bytes.toBytes("qual3"); - static final byte[] value1 = Bytes.toBytes("value1"); - static final byte[] value2 = Bytes.toBytes("value2"); - static final byte [] row = Bytes.toBytes("rowA"); - static final byte [] row2 = Bytes.toBytes("rowB"); - - /** - * @see org.apache.hadoop.hbase.HBaseTestCase#setUp() - */ - @Override - protected void setUp() throws Exception { - super.setUp(); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - EnvironmentEdgeManagerTestHelper.reset(); - } - - ////////////////////////////////////////////////////////////////////////////// - // New tests that doesn't spin up a mini cluster but rather just test the - // individual code pieces in the HRegion. - ////////////////////////////////////////////////////////////////////////////// - - /** - * Test one increment command. - */ - public void testIncrementColumnValue() throws IOException { - LOG.info("Starting test testIncrementColumnValue"); - initHRegion(tableName, getName(), fam1); - - long value = 1L; - long amount = 3L; - - Put put = new Put(row); - put.add(fam1, qual1, Bytes.toBytes(value)); - region.put(put); - - long result = region.incrementColumnValue(row, fam1, qual1, amount, true); - - assertEquals(value+amount, result); - - Store store = region.getStore(fam1); - // ICV removes any extra values floating around in there. - assertEquals(1, store.memstore.kvset.size()); - assertTrue(store.memstore.snapshot.isEmpty()); - - assertICV(row, fam1, qual1, value+amount); - } - - /** - * Test multi-threaded increments. - */ - public void testIncrementMultiThreads() throws IOException { - - LOG.info("Starting test testIncrementMultiThreads"); - initHRegion(tableName, getName(), fam1); - - // create 100 threads, each will increment by its own quantity - int numThreads = 100; - int incrementsPerThread = 1000; - Incrementer[] all = new Incrementer[numThreads]; - int expectedTotal = 0; - - // create all threads - for (int i = 0; i < numThreads; i++) { - all[i] = new Incrementer(region, i, i, incrementsPerThread); - expectedTotal += (i * incrementsPerThread); - } - - // run all threads - for (int i = 0; i < numThreads; i++) { - all[i].start(); - } - - // wait for all threads to finish - for (int i = 0; i < numThreads; i++) { - try { - all[i].join(); - } catch (InterruptedException e) { - } - } - assertICV(row, fam1, qual1, expectedTotal); - LOG.info("testIncrementMultiThreads successfully verified that total is " + - expectedTotal); - } - - - private void assertICV(byte [] row, - byte [] familiy, - byte[] qualifier, - long amount) throws IOException { - // run a get and see? - Get get = new Get(row); - get.addColumn(familiy, qualifier); - Result result = region.get(get, null); - assertEquals(1, result.size()); - - KeyValue kv = result.raw()[0]; - long r = Bytes.toLong(kv.getValue()); - assertEquals(amount, r); - } - - private void initHRegion (byte [] tableName, String callingMethod, - byte[] ... families) - throws IOException { - initHRegion(tableName, callingMethod, HBaseConfiguration.create(), families); - } - - private void initHRegion (byte [] tableName, String callingMethod, - Configuration conf, byte [] ... families) - throws IOException{ - HTableDescriptor htd = new HTableDescriptor(tableName); - for(byte [] family : families) { - htd.addFamily(new HColumnDescriptor(family)); - } - HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); - Path path = new Path(DIR + callingMethod); - if (fs.exists(path)) { - if (!fs.delete(path, true)) { - throw new IOException("Failed delete of " + path); - } - } - region = HRegion.createHRegion(info, path, conf, htd); - } - - /** - * A thread that makes a few increment calls - */ - public static class Incrementer extends Thread { - - private final HRegion region; - private final int threadNumber; - private final int numIncrements; - private final int amount; - - private int count; - - public Incrementer(HRegion region, - int threadNumber, int amount, int numIncrements) { - this.region = region; - this.threadNumber = threadNumber; - this.numIncrements = numIncrements; - this.count = 0; - this.amount = amount; - setDaemon(true); - } - - @Override - public void run() { - for (int i=0; i kvList : this.familyMap.values()) { + size += kvList.size(); + } + return size; + } + + /** + * @return the number of different families included in this put + */ + public int numFamilies() { + return familyMap.size(); + } } Index: src/main/java/org/apache/hadoop/hbase/client/Append.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Append.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/Append.java (revision 0) @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +public class Append extends Mutation implements Writable { + // TODO: refactor to derive from Put? + private static final String RETURN_RESULTS = "_rr_"; + private static final byte APPEND_VERSION = (byte)1; + + public void setReturnResults(boolean returnResults) { + setAttribute(RETURN_RESULTS, Bytes.toBytes(returnResults)); + } + + public boolean isReturnResults() { + byte[] v = getAttribute(RETURN_RESULTS); + return v == null ? true : Bytes.toBoolean(v); + } + + /** Constructor for Writable. DO NOT USE */ + public Append() {} + + public Append(byte[] row) { + this.row = Arrays.copyOf(row, row.length); + } + + /** + * Add the specified column and value to this Append operation. + * @param family family name + * @param qualifier column qualifier + * @param value value to append to specified column + * @return this + */ + public Append add(byte [] family, byte [] qualifier, byte [] value) { + List list = familyMap.get(family); + if(list == null) { + list = new ArrayList(); + } + list.add(new KeyValue( + this.row, family, qualifier, this.ts, KeyValue.Type.Put, value)); + familyMap.put(family, list); + return this; + } + + @Override + public void readFields(final DataInput in) + throws IOException { + int version = in.readByte(); + if (version > APPEND_VERSION) { + throw new IOException("version not supported"); + } + this.row = Bytes.readByteArray(in); + this.ts = in.readLong(); + this.lockId = in.readLong(); + this.writeToWAL = in.readBoolean(); + int numFamilies = in.readInt(); + if (!this.familyMap.isEmpty()) this.familyMap.clear(); + for(int i=0;i keys = new ArrayList(numKeys); + int totalLen = in.readInt(); + byte [] buf = new byte[totalLen]; + int offset = 0; + for (int j = 0; j < numKeys; j++) { + int keyLength = in.readInt(); + in.readFully(buf, offset, keyLength); + keys.add(new KeyValue(buf, offset, keyLength)); + offset += keyLength; + } + this.familyMap.put(family, keys); + } + readAttributes(in); + } + + @Override + public void write(final DataOutput out) + throws IOException { + out.writeByte(APPEND_VERSION); + Bytes.writeByteArray(out, this.row); + out.writeLong(this.ts); + out.writeLong(this.lockId); + out.writeBoolean(this.writeToWAL); + out.writeInt(familyMap.size()); + for (Map.Entry> entry : familyMap.entrySet()) { + Bytes.writeByteArray(out, entry.getKey()); + List keys = entry.getValue(); + out.writeInt(keys.size()); + int totalLen = 0; + for(KeyValue kv : keys) { + totalLen += kv.getLength(); + } + out.writeInt(totalLen); + for(KeyValue kv : keys) { + out.writeInt(kv.getLength()); + out.write(kv.getBuffer(), kv.getOffset(), kv.getLength()); + } + } + writeAttributes(out); + } +} Index: src/main/java/org/apache/hadoop/hbase/client/Put.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Put.java (revision 1180702) +++ src/main/java/org/apache/hadoop/hbase/client/Put.java (working copy) @@ -318,24 +318,6 @@ return list; } - /** - * @return the number of different families included in this put - */ - public int numFamilies() { - return familyMap.size(); - } - - /** - * @return the total number of KeyValues that will be added with this put - */ - public int size() { - int size = 0; - for(List kvList : this.familyMap.values()) { - size += kvList.size(); - } - return size; - } - //HeapSize public long heapSize() { long heapsize = OVERHEAD; Index: src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (revision 1180702) +++ src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (working copy) @@ -262,6 +262,21 @@ byte[] value, Delete delete) throws IOException; /** + * Appends values to one or more columns within a single row. + *

+ * This operation does not appear atomic to readers. Appends are done + * under a single row lock, so write operations to a row are synchronized, but + * readers do not take row locks so get and scan operations can see this + * operation partially completed. + * + * @param append object that specifies the columns and amounts to be used + * for the increment operations + * @throws IOException e + * @return values of columns after the append operation (maybe null) + */ + public Result append(final Append append) throws IOException; + + /** * Increments one or more columns within a single row. *

* This operation does not appear atomic to readers. Increments are done Index: src/main/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/HTable.java (revision 1180702) +++ src/main/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -740,6 +740,25 @@ * {@inheritDoc} */ @Override + public Result append(final Append append) throws IOException { + if (append.numFamilies() == 0) { + throw new IOException( + "Invalid arguments to append, no columns specified"); + } + return connection.getRegionServerWithRetries( + new ServerCallable(connection, tableName, append.getRow(), operationTimeout) { + public Result call() throws IOException { + return server.append( + location.getRegionInfo().getRegionName(), append); + } + } + ); + } + + /** + * {@inheritDoc} + */ + @Override public Result increment(final Increment increment) throws IOException { if (!increment.hasFamilies()) { throw new IOException( Index: src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1180702) +++ src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -947,6 +947,34 @@ } /** + * @param append append object + * @return result to return to client if default operation should be + * bypassed, null otherwise + * @throws IOException if an error occurred on the coprocessor + */ + public Result preAppend(Append append) + throws IOException { + boolean bypass = false; + Result result = new Result(); + ObserverContext ctx = null; + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver)env.getInstance()).preAppend(ctx, append, result); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + bypass |= ctx.shouldBypass(); + if (ctx.shouldComplete()) { + break; + } + } + } + return bypass ? result : null; + } + + /** * @param increment increment object * @return result to return to client if default operation should be * bypassed, null otherwise @@ -975,6 +1003,29 @@ } /** + * @param appent Append object + * @param result the result returned by postAppend + * @throws IOException if an error occurred on the coprocessor + */ + public void postAppend(final Append append, Result result) + throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env: coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + ((RegionObserver)env.getInstance()).postAppend(ctx, append, result); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + } + + /** * @param increment increment object * @param result the result returned by postIncrement * @throws IOException if an error occurred on the coprocessor Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1180702) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; +import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -3502,7 +3503,150 @@ return results; } + // TODO: There's a lot of boiler plate code identical + // to increment... See how to better unify that. /** + * + * Perform one or more append operations on a row. + *

+ * Appends performed are done under row lock but reads do not take locks out + * so this can be seen partially complete by gets and scans. + * + * @param append + * @param lockid + * @param returnResult + * @param writeToWAL + * @return new keyvalues after increment + * @throws IOException + */ + public Result append(Append append, Integer lockid, boolean writeToWAL) + throws IOException { + // TODO: Use RWCC to make this set of increments atomic to reads + byte[] row = append.getRow(); + checkRow(row, "increment"); + boolean flush = false; + WALEdit walEdits = null; + List allKVs = new ArrayList(append.size()); + List kvs = new ArrayList(append.size()); + long now = EnvironmentEdgeManager.currentTimeMillis(); + long size = 0; + long txid = 0; + + // Lock row + startRegionOperation(); + this.writeRequestsCount.increment(); + try { + Integer lid = getLock(lockid, row, true); + this.updatesLock.readLock().lock(); + try { + // Process each family + for (Map.Entry> family : append.getFamilyMap() + .entrySet()) { + + Store store = stores.get(family.getKey()); + + // Get previous values for all columns in this family + Get get = new Get(row); + for (KeyValue kv : family.getValue()) { + get.addColumn(family.getKey(), kv.getQualifier()); + } + List results = get(get, false); + + // Iterate the input columns and update existing values if they were + // found, otherwise add new column initialized to the append value + + // Avoid as much copying as possible. Every byte is copied at most + // once. + // Would be nice if KeyValue had scatter/gather logic + int idx = 0; + for (KeyValue kv : family.getValue()) { + KeyValue newKV; + if (idx < results.size() + && results.get(idx).matchingQualifier(kv.getBuffer(), + kv.getQualifierOffset(), kv.getQualifierLength())) { + KeyValue oldKv = results.get(idx); + // allocate an empty kv once + newKV = new KeyValue(row.length, kv.getFamilyLength(), + kv.getQualifierLength(), now, KeyValue.Type.Put, + oldKv.getValueLength() + kv.getValueLength()); + // copy in the value + System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(), + newKV.getBuffer(), newKV.getValueOffset(), + oldKv.getValueLength()); + System.arraycopy(kv.getBuffer(), kv.getValueOffset(), + newKV.getBuffer(), + newKV.getValueOffset() + oldKv.getValueLength(), + kv.getValueLength()); + idx++; + } else { + // allocate an empty kv once + newKV = new KeyValue(row.length, kv.getFamilyLength(), + kv.getQualifierLength(), now, KeyValue.Type.Put, + kv.getValueLength()); + // copy in the value + System.arraycopy(kv.getBuffer(), kv.getValueOffset(), + newKV.getBuffer(), newKV.getValueOffset(), + kv.getValueLength()); + } + // copy in row, family, and qualifier + System.arraycopy(kv.getBuffer(), kv.getRowOffset(), + newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength()); + System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(), + newKV.getBuffer(), newKV.getFamilyOffset(), + kv.getFamilyLength()); + System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(), + newKV.getBuffer(), newKV.getQualifierOffset(), + kv.getQualifierLength()); + + kvs.add(newKV); + + // Append update to WAL + if (writeToWAL) { + if (walEdits == null) { + walEdits = new WALEdit(); + } + walEdits.add(newKV); + } + } + + // Write the KVs for this family into the store + size += store.upsert(kvs); + allKVs.addAll(kvs); + kvs.clear(); + } + + // Actually write to WAL now + if (writeToWAL) { + // Using default cluster id, as this can only happen in the orginating + // cluster. A slave cluster receives the final value (not the delta) + // as a Put. + txid = this.log.appendNoSync(regionInfo, + this.htableDescriptor.getName(), walEdits, + HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); + } + + size = this.addAndGetGlobalMemstoreSize(size); + flush = isFlushSize(size); + } finally { + this.updatesLock.readLock().unlock(); + releaseRowLock(lid); + } + if (writeToWAL) { + this.log.sync(txid); // sync the transaction log outside the rowlock + } + } finally { + closeRegionOperation(); + } + + if (flush) { + // Request a cache flush. Do it outside update lock. + requestFlush(); + } + + return append.isReturnResults() ? new Result(allKVs) : null; + } + + /** * * Perform one or more increment operations on a row. *

Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1180702) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.RootLocationEditor; import org.apache.hadoop.hbase.client.Action; +import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -123,7 +124,6 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CompressionTest; @@ -2829,6 +2829,37 @@ } @Override + public Result append(byte[] regionName, Append append) + throws IOException { + checkOpen(); + if (regionName == null) { + throw new IOException("Invalid arguments to increment " + + "regionName is null"); + } + requestCount.incrementAndGet(); + try { + HRegion region = getRegion(regionName); + Integer lock = getLockFromId(append.getLockId()); + Append appVal = append; + Result resVal; + if (region.getCoprocessorHost() != null) { + resVal = region.getCoprocessorHost().preAppend(appVal); + if (resVal != null) { + return resVal; + } + } + resVal = region.append(appVal, lock, append.getWriteToWAL()); + if (region.getCoprocessorHost() != null) { + region.getCoprocessorHost().postAppend(appVal, resVal); + } + return resVal; + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + @Override public Result increment(byte[] regionName, Increment increment) throws IOException { checkOpen(); Index: src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (revision 1180702) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java (working copy) @@ -394,6 +394,11 @@ } @Override + public Result append(Append append) throws IOException { + return table.append(append); + } + + @Override public Result increment(Increment increment) throws IOException { return table.increment(increment); } Index: src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (revision 1180702) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (working copy) @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -185,6 +186,16 @@ } @Override + public void preAppend(final ObserverContext e, + final Append append, final Result result) throws IOException { + } + + @Override + public void postAppend(final ObserverContext e, + final Append append, final Result result) throws IOException { + } + + @Override public long preIncrementColumnValue(final ObserverContext e, final byte [] row, final byte [] family, final byte [] qualifier, final long amount, final boolean writeToWAL) throws IOException { Index: src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (revision 1180702) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (working copy) @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -458,6 +459,38 @@ throws IOException; /** + * Called before Append + *

+ * Call CoprocessorEnvironment#bypass to skip default actions + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors + * @param c the environment provided by the region server + * @param append Append object + * @param result The result to return to the client if default processing + * is bypassed. Can be modified. Will not be used if default processing + * is not bypassed. + * @throws IOException if an error occurred on the coprocessor + */ + void preAppend(final ObserverContext c, + final Append append, final Result result) + throws IOException; + + /** + * Called after Append + *

+ * Call CoprocessorEnvironment#complete to skip any subsequent chained + * coprocessors + * @param c the environment provided by the region server + * @param append Append object + * @param result the result returned by increment, can be modified + * @throws IOException if an error occurred on the coprocessor + */ + void postAppend(final ObserverContext c, + final Append append, final Result result) + throws IOException; + + /** * Called before Increment *

* Call CoprocessorEnvironment#bypass to skip default actions Index: src/main/java/org/apache/hadoop/hbase/KeyValue.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/KeyValue.java (revision 1180702) +++ src/main/java/org/apache/hadoop/hbase/KeyValue.java (working copy) @@ -406,6 +406,82 @@ } /** + * Constructs an empty KeyValue structure, with specified sizes. + * This can be used to partially fill up KeyValues. + *

+ * Column is split into two fields, family and qualifier. + * @param rlength row length + * @param flength family length + * @param qlength qualifier length + * @param timestamp version timestamp + * @param type key type + * @param vlength value length + * @throws IllegalArgumentException + */ + public KeyValue(final int rlength, + final int flength, + final int qlength, + final long timestamp, final Type type, + final int vlength) { + this.bytes = createEmptyByteArray(rlength, + flength, qlength, + timestamp, type, vlength); + this.length = bytes.length; + this.offset = 0; + } + + /** + * Create an empty byte[] representing a KeyValue + * All lengths are preset and can be filled in later. + * @param rlength + * @param flength + * @param qlength + * @param timestamp + * @param type + * @param vlength + * @return The newly created byte array. + */ + static byte[] createEmptyByteArray(final int rlength, int flength, + int qlength, final long timestamp, final Type type, int vlength) { + if (rlength > Short.MAX_VALUE) { + throw new IllegalArgumentException("Row > " + Short.MAX_VALUE); + } + if (flength > Byte.MAX_VALUE) { + throw new IllegalArgumentException("Family > " + Byte.MAX_VALUE); + } + // Qualifier length + if (qlength > Integer.MAX_VALUE - rlength - flength) { + throw new IllegalArgumentException("Qualifier > " + Integer.MAX_VALUE); + } + // Key length + long longkeylength = KEY_INFRASTRUCTURE_SIZE + rlength + flength + qlength; + if (longkeylength > Integer.MAX_VALUE) { + throw new IllegalArgumentException("keylength " + longkeylength + " > " + + Integer.MAX_VALUE); + } + int keylength = (int)longkeylength; + // Value length + if (vlength > HConstants.MAXIMUM_VALUE_LENGTH) { // FindBugs INT_VACUOUS_COMPARISON + throw new IllegalArgumentException("Valuer > " + + HConstants.MAXIMUM_VALUE_LENGTH); + } + + // Allocate right-sized byte array. + byte [] bytes = new byte[KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength]; + // Write the correct size markers + int pos = 0; + pos = Bytes.putInt(bytes, pos, keylength); + pos = Bytes.putInt(bytes, pos, vlength); + pos = Bytes.putShort(bytes, pos, (short)(rlength & 0x0000ffff)); + pos += rlength; + pos = Bytes.putByte(bytes, pos, (byte)(flength & 0x0000ff)); + pos += flength + qlength; + pos = Bytes.putLong(bytes, pos, timestamp); + pos = Bytes.putByte(bytes, pos, type.getCode()); + return bytes; + } + + /** * Write KeyValue format into a byte array. * * @param row row key Index: src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (revision 1180702) +++ src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (working copy) @@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -237,7 +238,9 @@ addToMap(HServerLoad.class, code++); addToMap(RegionOpeningState.class, code++); - + + addToMap(Append.class, code++); + } private Class declaredClass; Index: src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 1180702) +++ src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; @@ -211,6 +212,19 @@ throws IOException; /** + * Appends values to one or more columns values in a row. Optionally + * Returns the updated keys after the append. + *

+ * This operation does not appear atomic to readers. Appends are done + * under a row lock but readers do not take row locks. + * @param regionName region name + * @param append Append operation + * @return changed cells (maybe null) + */ + public Result append(byte[] regionName, Append append) + throws IOException; + + /** * Increments one or more columns values in a row. Returns the * updated keys after the increment. *