Index: hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java =================================================================== --- hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (revision 1538315) +++ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java (working copy) @@ -627,7 +627,9 @@ "Missing required field: qualifer value"); } byte[] value = qv.getValue().toByteArray(); - append.add(family, qualifier, value); + byte[] tags = qv.getTags().toByteArray(); + append.add(CellUtil.createCell(row, family, qualifier, append.getTimeStamp(), + KeyValue.Type.Put, value, tags)); } } } @@ -699,8 +701,10 @@ if (!qv.hasValue()) { throw new DoNotRetryIOException("Missing required field: qualifer value"); } - long value = Bytes.toLong(qv.getValue().toByteArray()); - increment.addColumn(family, qualifier, value); + byte[] value = qv.getValue().toByteArray(); + byte[] tags = qv.getTags().toByteArray(); + increment.add(CellUtil.createCell(row, family, qualifier, increment.getTimeStamp(), + KeyValue.Type.Put, value, tags)); } } } @@ -973,6 +977,10 @@ kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())); valueBuilder.setValue(ZeroCopyLiteralByteString.wrap( kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + if (kv.getTagsLength() > 0) { + valueBuilder.setTags(ZeroCopyLiteralByteString.wrap(kv.getTagsArray(), + kv.getTagsOffset(), kv.getTagsLength())); + } columnBuilder.addQualifierValue(valueBuilder.build()); } } Index: hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java (revision 1538315) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java (working copy) @@ -172,6 +172,12 @@ return keyValue; } + public static Cell createCell(final byte[] row, final byte[] family, final byte[] qualifier, + final long timestamp, Type type, final byte[] value, byte[] tags) { + KeyValue keyValue = new KeyValue(row, family, qualifier, timestamp, type, value, tags); + return keyValue; + } + /** * @param cellScannerables * @return CellScanner interface over cellIterables Index: hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (revision 1538315) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (working copy) @@ -809,7 +809,7 @@ pos += flength + qlength; pos = Bytes.putLong(bytes, pos, timestamp); pos = Bytes.putByte(bytes, pos, type.getCode()); - pos += keylength + vlength; + pos += vlength; if (tagsLength > 0) { pos = Bytes.putShort(bytes, pos, (short)(tagsLength & 0x0000ffff)); } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (revision 1538315) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (working copy) @@ -477,4 +477,10 @@ DataBlockEncoding preferredEncodingInCache, Reference r, Reader reader) throws IOException { return reader; } + + @Override + public Cell postMutationBeforeWAL(ObserverContext ctx, + MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { + return newCell; + } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (revision 1538315) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (working copy) @@ -67,6 +67,11 @@ @InterfaceStability.Evolving public interface RegionObserver extends Coprocessor { + /** Mutation type for postMutationBeforeWAL hook */ + public enum MutationType { + APPEND, INCREMENT + } + /** * Called before the region is reported as open to the master. * @param c the environment provided by the region server @@ -1052,4 +1057,20 @@ final FileSystem fs, final Path p, final FSDataInputStreamWrapper in, long size, final CacheConfig cacheConf, final DataBlockEncoding preferredEncodingInCache, final Reference r, StoreFile.Reader reader) throws IOException; + + /** + * Called after a new cell has been created during an increment operation, but before + * it is committed to the WAL or memstore. + * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no + * effect in this hook. + * @param ctx the environment provided by the region server + * @param opType the operation type + * @param mutation the current mutation + * @param oldCell old cell containing previous value + * @param newCell the new cell containing the computed value + * @return the new cell, possibly changed + * @throws IOException + */ + Cell postMutationBeforeWAL(ObserverContext ctx, + MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException; } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1538315) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -94,6 +94,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException; @@ -4667,13 +4668,15 @@ for (Cell cell : family.getValue()) { KeyValue kv = KeyValueUtil.ensureKeyValue(cell); KeyValue newKV; + KeyValue oldKv = null; if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx),kv)) { - KeyValue oldKv = KeyValueUtil.ensureKeyValue(results.get(idx)); + oldKv = KeyValueUtil.ensureKeyValue(results.get(idx)); // allocate an empty kv once newKV = new KeyValue(row.length, kv.getFamilyLength(), kv.getQualifierLength(), now, KeyValue.Type.Put, - oldKv.getValueLength() + kv.getValueLength()); + oldKv.getValueLength() + kv.getValueLength(), + oldKv.getTagsLength() + kv.getTagsLength()); // copy in the value System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(), newKV.getBuffer(), newKV.getValueOffset(), @@ -4682,16 +4685,24 @@ newKV.getBuffer(), newKV.getValueOffset() + oldKv.getValueLength(), kv.getValueLength()); + // copy in the tags + System.arraycopy(oldKv.getBuffer(), oldKv.getTagsOffset(), newKV.getBuffer(), + newKV.getTagsOffset(), oldKv.getTagsLength()); + System.arraycopy(kv.getBuffer(), kv.getTagsOffset(), newKV.getBuffer(), + newKV.getTagsOffset() + oldKv.getTagsLength(), kv.getTagsLength()); idx++; } else { // allocate an empty kv once newKV = new KeyValue(row.length, kv.getFamilyLength(), kv.getQualifierLength(), now, KeyValue.Type.Put, - kv.getValueLength()); + kv.getValueLength(), kv.getTagsLength()); // copy in the value System.arraycopy(kv.getBuffer(), kv.getValueOffset(), newKV.getBuffer(), newKV.getValueOffset(), kv.getValueLength()); + // copy in tags + System.arraycopy(kv.getBuffer(), kv.getTagsOffset(), newKV.getBuffer(), + newKV.getTagsOffset(), kv.getTagsLength()); } // copy in row, family, and qualifier System.arraycopy(kv.getBuffer(), kv.getRowOffset(), @@ -4704,6 +4715,11 @@ kv.getQualifierLength()); newKV.setMvccVersion(w.getWriteNumber()); + // Give coprocessors a chance to update the new cell + if (coprocessorHost != null) { + newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL( + RegionObserver.MutationType.APPEND, append, oldKv, (Cell) newKV)); + } kvs.add(newKV); // Append update to WAL @@ -4837,8 +4853,9 @@ int idx = 0; for (Cell kv: family.getValue()) { long amount = Bytes.toLong(CellUtil.cloneValue(kv)); + Cell c = null; if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) { - Cell c = results.get(idx); + c = results.get(idx); if(c.getValueLength() == Bytes.SIZEOF_LONG) { amount += Bytes.toLong(c.getValueArray(), c.getValueOffset(), Bytes.SIZEOF_LONG); } else { @@ -4850,9 +4867,33 @@ } // Append new incremented KeyValue to list - KeyValue newKV = - new KeyValue(row, family.getKey(), CellUtil.cloneQualifier(kv), now, Bytes.toBytes(amount)); + byte[] q = CellUtil.cloneQualifier(kv); + byte[] val = Bytes.toBytes(amount); + int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength(); + int incCellTagsLen = kv.getTagsLength(); + KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, now, + KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen); + System.arraycopy(row, 0, newKV.getBuffer(), newKV.getRowOffset(), row.length); + System.arraycopy(family.getKey(), 0, newKV.getBuffer(), newKV.getFamilyOffset(), + family.getKey().length); + System.arraycopy(q, 0, newKV.getBuffer(), newKV.getQualifierOffset(), q.length); + // copy in the value + System.arraycopy(val, 0, newKV.getBuffer(), newKV.getValueOffset(), val.length); + // copy tags + if (oldCellTagsLen > 0) { + System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getBuffer(), + newKV.getTagsOffset(), oldCellTagsLen); + } + if (incCellTagsLen > 0) { + System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getBuffer(), + newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen); + } newKV.setMvccVersion(w.getWriteNumber()); + // Give coprocessors a chance to update the new cell + if (coprocessorHost != null) { + newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL( + RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKV)); + } kvs.add(newKV); // Prepare WAL updates Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (revision 1538315) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (working copy) @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; @@ -1708,4 +1709,24 @@ } return reader; } + + public Cell postMutationBeforeWAL(MutationType opType, Mutation mutation, Cell oldCell, + Cell newCell) throws IOException { + ObserverContext ctx = null; + for (RegionEnvironment env : coprocessors) { + if (env.getInstance() instanceof RegionObserver) { + ctx = ObserverContext.createAndPrepare(env, ctx); + try { + newCell = ((RegionObserver) env.getInstance()).postMutationBeforeWAL(ctx, opType, + mutation, oldCell, newCell); + } catch (Throwable e) { + handleCoprocessorThrowable(env, e); + } + if (ctx.shouldComplete()) { + break; + } + } + } + return newCell; + } } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java (revision 1538315) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java (working copy) @@ -17,13 +17,12 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.NavigableMap; -import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -37,9 +36,11 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -55,8 +56,10 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; /** * Class that test tags @@ -67,6 +70,9 @@ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + @Rule + public final TestName TEST_NAME = new TestName(); + @BeforeClass public static void setUpBeforeClass() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); @@ -90,7 +96,7 @@ public void testTags() throws Exception { HTable table = null; try { - TableName tableName = TableName.valueOf("testTags"); + TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); byte[] fam = Bytes.toBytes("info"); byte[] row = Bytes.toBytes("rowa"); // column names @@ -168,7 +174,7 @@ public void testFlushAndCompactionWithoutTags() throws Exception { HTable table = null; try { - TableName tableName = TableName.valueOf("testFlushAndCompactionWithoutTags"); + TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); byte[] fam = Bytes.toBytes("info"); byte[] row = Bytes.toBytes("rowa"); // column names @@ -270,7 +276,7 @@ public void testFlushAndCompactionwithCombinations() throws Exception { HTable table = null; try { - TableName tableName = TableName.valueOf("testFlushAndCompactionwithCombinations"); + TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); byte[] fam = Bytes.toBytes("info"); byte[] row = Bytes.toBytes("rowa"); // column names @@ -394,6 +400,114 @@ } } + @Test + public void testTagsWithAppendAndIncrement() throws Exception { + TableName tableName = TableName.valueOf(TEST_NAME.getMethodName()); + byte[] f = Bytes.toBytes("f"); + byte[] q = Bytes.toBytes("q"); + byte[] row1 = Bytes.toBytes("r1"); + byte[] row2 = Bytes.toBytes("r2"); + + HTableDescriptor desc = new HTableDescriptor(tableName); + HColumnDescriptor colDesc = new HColumnDescriptor(f); + desc.addFamily(colDesc); + TEST_UTIL.getHBaseAdmin().createTable(desc); + + HTable table = null; + try { + table = new HTable(TEST_UTIL.getConfiguration(), tableName); + Put put = new Put(row1); + byte[] v = Bytes.toBytes(2L); + put.add(f, q, v, new Tag[] { new Tag((byte) 1, "tag1") }); + table.put(put); + Increment increment = new Increment(row1); + increment.addColumn(f, q, 1L); + table.increment(increment); + ResultScanner scanner = table.getScanner(new Scan()); + Result result = scanner.next(); + KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); + List tags = kv.getTags(); + assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + assertEquals(1, tags.size()); + assertEquals("tag1", Bytes.toString(tags.get(0).getValue())); + increment = new Increment(row1); + increment.add(new KeyValue(row1, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") })); + table.increment(increment); + scanner = table.getScanner(new Scan()); + result = scanner.next(); + kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); + tags = kv.getTags(); + assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + assertEquals(2, tags.size()); + assertEquals("tag1", Bytes.toString(tags.get(0).getValue())); + assertEquals("tag2", Bytes.toString(tags.get(1).getValue())); + + put = new Put(row2); + v = Bytes.toBytes(2L); + put.add(f, q, v); + table.put(put); + increment = new Increment(row2); + increment.add(new KeyValue(row2, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") })); + table.increment(increment); + Scan scan = new Scan(); + scan.setStartRow(row2); + scanner = table.getScanner(scan); + result = scanner.next(); + kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); + tags = kv.getTags(); + assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength())); + assertEquals(1, tags.size()); + assertEquals("tag2", Bytes.toString(tags.get(0).getValue())); + + // Test Append + byte[] row3 = Bytes.toBytes("r3"); + put = new Put(row3); + put.add(f, q, Bytes.toBytes("a"), new Tag[] { new Tag((byte) 1, "tag1") }); + table.put(put); + Append append = new Append(row3); + append.add(f, q, Bytes.toBytes("b")); + table.append(append); + scan = new Scan(); + scan.setStartRow(row3); + scanner = table.getScanner(scan); + result = scanner.next(); + kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); + tags = kv.getTags(); + assertEquals(1, tags.size()); + assertEquals("tag1", Bytes.toString(tags.get(0).getValue())); + append = new Append(row3); + append.add(new KeyValue(row3, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") })); + table.append(append); + scanner = table.getScanner(scan); + result = scanner.next(); + kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); + tags = kv.getTags(); + assertEquals(2, tags.size()); + assertEquals("tag1", Bytes.toString(tags.get(0).getValue())); + assertEquals("tag2", Bytes.toString(tags.get(1).getValue())); + + byte[] row4 = Bytes.toBytes("r4"); + put = new Put(row4); + put.add(f, q, Bytes.toBytes("a")); + table.put(put); + append = new Append(row4); + append.add(new KeyValue(row4, f, q, 1234L, v, new Tag[] { new Tag((byte) 1, "tag2") })); + table.append(append); + scan = new Scan(); + scan.setStartRow(row4); + scanner = table.getScanner(scan); + result = scanner.next(); + kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q)); + tags = kv.getTags(); + assertEquals(1, tags.size()); + assertEquals("tag2", Bytes.toString(tags.get(0).getValue())); + } finally { + if (table != null) { + table.close(); + } + } + } + private void result(byte[] fam, byte[] row, byte[] qual, byte[] row2, HTable table, byte[] value, byte[] value2, byte[] row1, byte[] value1) throws IOException { Scan s = new Scan(row);