diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java index 187c077..91095c8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java @@ -25,6 +25,7 @@ import java.util.NavigableMap; import java.util.TreeMap; import java.util.UUID; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.security.access.Permission; import org.apache.hadoop.hbase.security.visibility.CellVisibility; +import org.apache.hadoop.hbase.types.NumberCodecType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; @@ -120,6 +122,38 @@ public class Increment extends Mutation implements Comparable { * @return the Increment object */ public Increment addColumn(byte [] family, byte [] qualifier, long amount) { + return addColumn(family, qualifier, amount, null); + } + + /** + * Increment the column from the specific family with the specified qualifier + * by the specified amount. + *

+ * Overrides previous calls to addColumn for this family and qualifier. + * @param family family name + * @param qualifier column qualifier + * @param amount amount to increment by + * @param type number codec type + * @return the Increment object + */ + public Increment addColumn(byte [] family, byte [] qualifier, long amount, NumberCodecType type) { + return addColumn(family, qualifier, (Long) amount, type); + } + + /** + * Increment the column from the specific family with the specified qualifier + * by the specified amount. + *

+ * Overrides previous calls to addColumn for this family and qualifier. + * @param family family name + * @param qualifier column qualifier + * @param amount amount to increment by + * @param type number codec type + * @return the Increment object + */ + @SuppressWarnings("unchecked") + public Increment addColumn(byte [] family, byte [] qualifier, Number amount, + NumberCodecType type) { if (family == null) { throw new IllegalArgumentException("family cannot be null"); } @@ -127,9 +161,15 @@ public class Increment extends Mutation implements Comparable { throw new IllegalArgumentException("qualifier cannot be null"); } List list = getCellList(family); - KeyValue kv = createPutKeyValue(family, qualifier, ts, Bytes.toBytes(amount)); + KeyValue kv = createPutKeyValue(family, qualifier, ts, + type == null ? Bytes.toBytes(amount.longValue()) : type.encode(amount)); list.add(kv); familyMap.put(CellUtil.cloneFamily(kv), list); + if (type != null) { + byte[] value = new byte[1]; + value[0] = type.getTypeId(); + setAttribute(getTypeAttributeName(family, qualifier), value); + } return this; } @@ -219,6 +259,16 @@ public class Increment extends Mutation implements Comparable { return results; } + public NumberCodecType getNumberCodecType(byte[] family, byte[] qualifier) { + byte[] value = getAttribute(getTypeAttributeName(family, qualifier)); + NumberCodecType type = NumberCodecType.RAW_LONG; // default + if (value != null) { + assert value.length == 1; + type = NumberCodecType.fromTypeId(value[0]); + } + return type; + } + /** * @return String */ @@ -254,8 +304,11 @@ public class Increment extends Mutation implements Comparable { } else { moreThanOneB = true; } + sb.append(CellUtil.getCellKeyAsString(cell) + "+=" + Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); + NumberCodecType type = getNumberCodecType(entry.getKey(),CellUtil.cloneQualifier(cell)); + sb.append("(type=").append(type.name()).append(")"); } sb.append("}"); } @@ -339,4 +392,12 @@ public class Increment extends Mutation implements Comparable { public Increment setTTL(long ttl) { return (Increment) super.setTTL(ttl); } + + @VisibleForTesting + protected static String getTypeAttributeName(byte[] family, byte[] qualifier) { + String f = Bytes.toString(family); + String q = Bytes.toString(qualifier); + return f.replaceAll("\\.", "\\\\.") + "." + q.replaceAll("\\.", "\\\\.") + ".type"; + } + } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java index c38340d..a7dd2e8 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestIncrement.java @@ -24,6 +24,7 @@ import java.util.NavigableMap; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.types.NumberCodecType; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -49,4 +50,22 @@ public class TestIncrement { } assertEquals(total, found); } + + @Test + public void testType() { + assertEquals("a.b.type", Increment.getTypeAttributeName("a".getBytes(), "b".getBytes())); + assertEquals("\\..\\..type", Increment.getTypeAttributeName(".".getBytes(), ".".getBytes())); + assertEquals("a\\.\\..\\.\\.b.type", + Increment.getTypeAttributeName("a..".getBytes(), "..b".getBytes())); + + Increment increment = new Increment("row".getBytes()); + for (NumberCodecType type : NumberCodecType.values()) { + increment.addColumn("c".getBytes(), type.name().getBytes(), 1, type); + } + for (NumberCodecType type : NumberCodecType.values()) { + assertEquals(type, increment.getNumberCodecType("c".getBytes(), type.name().getBytes())); + } + assertEquals(NumberCodecType.RAW_LONG, + increment.getNumberCodecType("c".getBytes(), "invalid".getBytes())); + } } \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/types/NumberCodecType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/NumberCodecType.java new file mode 100644 index 0000000..03c987c --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/types/NumberCodecType.java @@ -0,0 +1,256 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.types; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.PositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedByteRange; +import org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public enum NumberCodecType { + // the id is used for serialization, don't change or remove the existing value + RAW_BYTE((byte) 0x00), + RAW_SHORT((byte) 0x01), + RAW_INTEGER((byte) 0x02), + RAW_LONG((byte) 0x03), + RAW_FLOAT((byte) 0x04), + RAW_DOUBLE((byte) 0x05), + + ORDERED_INT8_ASC((byte) 0x10), + ORDERED_INT16_ASC((byte) 0x11), + ORDERED_INT32_ASC((byte) 0x12), + ORDERED_INT64_ASC((byte) 0x13), + ORDERED_FLOAT32_ASC((byte) 0x14), + ORDERED_FLOAT64_ASC((byte) 0x15); + + static Map lookupMap; + + static { + lookupMap = new HashMap(); + for (NumberCodecType e : NumberCodecType.values()) { + lookupMap.put(e.id, e); + } + } + + private final byte id; + + NumberCodecType(byte id) { + this.id = id; + } + + public static NumberCodecType fromTypeId(byte id) { + NumberCodecType type = lookupMap.get(id); + if (type == null) { + throw new IllegalArgumentException("invalid enum type: " + id); + } + return type; + } + + public byte getTypeId() { + return id; + } + + /** + * Encode the value, the caller is responsible for the conversion overflow. + * + * @param value number to encode + * @return encoded byte array + */ + public byte[] encode(Number value) { + switch (this) { + case RAW_BYTE: + return new byte[] { value.byteValue() }; + case RAW_SHORT: + return Bytes.toBytes(value.shortValue()); + case RAW_INTEGER: + return Bytes.toBytes(value.intValue()); + case RAW_LONG: + return Bytes.toBytes(value.longValue()); + case RAW_FLOAT: + return Bytes.toBytes(value.floatValue()); + case RAW_DOUBLE: + return Bytes.toBytes(value.doubleValue()); + case ORDERED_INT8_ASC: { + OrderedInt8 codec = OrderedInt8.ASCENDING; + byte val = value.byteValue(); + PositionedByteRange dst = new SimplePositionedMutableByteRange(codec.encodedLength(val)); + codec.encode(dst, val); + return dst.getBytes(); + } + case ORDERED_INT16_ASC: { + OrderedInt16 codec = OrderedInt16.ASCENDING; + short val = value.shortValue(); + PositionedByteRange dst = new SimplePositionedMutableByteRange(codec.encodedLength(val)); + codec.encode(dst, val); + return dst.getBytes(); + } + case ORDERED_INT32_ASC: { + OrderedInt32 codec = OrderedInt32.ASCENDING; + int val = value.intValue(); + PositionedByteRange dst = new SimplePositionedMutableByteRange(codec.encodedLength(val)); + codec.encode(dst, val); + return dst.getBytes(); + } + case ORDERED_INT64_ASC: { + OrderedInt64 codec = OrderedInt64.ASCENDING; + long val = value.longValue(); + PositionedByteRange dst = new SimplePositionedMutableByteRange(codec.encodedLength(val)); + codec.encode(dst, val); + return dst.getBytes(); + } + case ORDERED_FLOAT32_ASC: { + OrderedFloat32 codec = OrderedFloat32.ASCENDING; + float val = value.floatValue(); + PositionedByteRange dst = new SimplePositionedMutableByteRange(codec.encodedLength(val)); + codec.encode(dst, val); + return dst.getBytes(); + } + case ORDERED_FLOAT64_ASC: { + OrderedFloat64 codec = OrderedFloat64.ASCENDING; + double val = value.doubleValue(); + PositionedByteRange dst = new SimplePositionedMutableByteRange(codec.encodedLength(val)); + codec.encode(dst, val); + return dst.getBytes(); + } + default: + throw new IllegalArgumentException("invalid type: " + this); + } + } + + /** + * Decode the byte array. + * + * @param bytes byte array to decode + * @return decoded number + */ + public Number decode(byte[] bytes) { + return decode(bytes, 0, bytes.length); + } + + /** + * Decode the byte array. + * + * @param bytes bytes contains the data to decode + * @param offset offset in the byte array + * @param length length of the byte array to decode + * @return the decoded number + */ + public Number decode(byte[] bytes, int offset, int length) { + PositionedByteRange src = new SimplePositionedByteRange(bytes, offset, length); + switch (this) { + case RAW_BYTE: + return src.get(); + case RAW_SHORT: + return Bytes.toShort(bytes, offset, length); + case RAW_INTEGER: + return Bytes.toInt(bytes, offset, length); + case RAW_LONG: + return Bytes.toLong(bytes, offset, length); + case RAW_FLOAT: + return Bytes.toFloat(bytes, offset); + case RAW_DOUBLE: + return Bytes.toDouble(bytes, offset); + case ORDERED_INT8_ASC: + return OrderedInt8.ASCENDING.decode(src); + case ORDERED_INT16_ASC: + return OrderedInt16.ASCENDING.decode(src); + case ORDERED_INT32_ASC: + return OrderedInt32.ASCENDING.decode(src); + case ORDERED_INT64_ASC: + return OrderedInt64.ASCENDING.decode(src); + case ORDERED_FLOAT32_ASC: + return OrderedFloat32.ASCENDING.decode(src); + case ORDERED_FLOAT64_ASC: + return OrderedFloat64.ASCENDING.decode(src); + default: + throw new IllegalArgumentException("invalid type: " + this); + } + } + + /** + * Add tow numbers. + * + * @param a number 1 + * @param b number 2 + * @return the sum of two numbers + */ + public Number add(Number a, Number b) { + switch (this) { + case RAW_BYTE: + case ORDERED_INT8_ASC: + return a.byteValue() + b.byteValue(); + case RAW_SHORT: + case ORDERED_INT16_ASC: + return a.shortValue() + b.shortValue(); + case RAW_INTEGER: + case ORDERED_INT32_ASC: + return a.intValue() + b.intValue(); + case RAW_LONG: + case ORDERED_INT64_ASC: + return a.longValue() + b.longValue(); + case RAW_FLOAT: + case ORDERED_FLOAT32_ASC: + return a.floatValue() + b.floatValue(); + case RAW_DOUBLE: + case ORDERED_FLOAT64_ASC: + return a.doubleValue() + b.doubleValue(); + default: + throw new IllegalArgumentException("invalid type: " + this); + } + } + + /** + * Check whether the number is zero + * + * @param number the number to check + * @return true if the value is zero + */ + public boolean isZero(Number number) { + switch (this) { + case RAW_BYTE: + case ORDERED_INT8_ASC: + return number.byteValue() == (byte) 0; + case RAW_SHORT: + case ORDERED_INT16_ASC: + return number.shortValue() == (short) 0; + case RAW_INTEGER: + case ORDERED_INT32_ASC: + return number.intValue() == 0; + case RAW_LONG: + case ORDERED_INT64_ASC: + return number.longValue() == 0L; + case RAW_FLOAT: + case ORDERED_FLOAT32_ASC: + return number.floatValue() == 0.0f; + case RAW_DOUBLE: + case ORDERED_FLOAT64_ASC: + return number.doubleValue() == 0.0; + default: + throw new IllegalArgumentException("invalid type: " + this); + } + } +} + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 4da0f13..9ae3797 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -160,6 +160,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; +import org.apache.hadoop.hbase.types.NumberCodecType; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; @@ -7237,10 +7238,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi case INCREMENT: mutationType = MutationType.INCREMENT; // If delta amount to apply is 0, don't write WAL or MemStore. - long deltaAmount = getLongValue(delta); - apply = deltaAmount != 0; + NumberCodecType type = ((Increment)mutation).getNumberCodecType(columnFamily, + CellUtil.cloneQualifier(delta)); + Number deltaAmount = getValue(delta, type); + apply = !type.isZero(deltaAmount); // also for floating point numbers newCell = reckonIncrement(delta, deltaAmount, currentValue, columnFamily, now, - (Increment)mutation); + (Increment)mutation, type); break; case APPEND: mutationType = MutationType.APPEND; @@ -7272,21 +7275,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return New Increment Cell with delta applied to currentValue if currentValue is not null; * otherwise, a new Cell with the delta set as its value. */ - private Cell reckonIncrement(final Cell delta, final long deltaAmount, final Cell currentValue, - byte [] columnFamily, final long now, Mutation mutation) + private Cell reckonIncrement(final Cell delta, final Number deltaAmount, final Cell currentValue, + byte [] columnFamily, final long now, Mutation mutation, NumberCodecType type) throws IOException { // Forward any tags found on the delta. List tags = TagUtil.carryForwardTags(delta); - long newValue = deltaAmount; + Number newValue = deltaAmount; long ts = now; if (currentValue != null) { tags = TagUtil.carryForwardTags(tags, currentValue); ts = Math.max(now, currentValue.getTimestamp()); - newValue += getLongValue(currentValue); + newValue = type.add(newValue, getValue(currentValue, type)); } // Now make up the new Cell. TODO: FIX. This is carnel knowledge of how KeyValues are made... // doesn't work well with offheaping or if we are doing a different Cell type. - byte [] incrementAmountInBytes = Bytes.toBytes(newValue); + byte [] incrementAmountInBytes = type.encode(newValue); tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL()); byte [] row = mutation.getRow(); return new KeyValue(row, 0, row.length, @@ -7346,15 +7349,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /** - * @return Get the long out of the passed in Cell + * @return Get the value of the passed in Cell */ - private static long getLongValue(final Cell cell) throws DoNotRetryIOException { - int len = cell.getValueLength(); - if (len != Bytes.SIZEOF_LONG) { - // throw DoNotRetryIOException instead of IllegalArgumentException - throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide"); + private static Number getValue(final Cell cell, final NumberCodecType type) + throws DoNotRetryIOException { + try { + return type.decode(cell.getValueArray(), + cell.getValueOffset(), + cell.getValueLength()).longValue(); + } catch (Exception ex) { + throw new DoNotRetryIOException( + "Attempted to increment field that has incorrect type: " + cell); } - return Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), len); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 520f210..92f5af3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -96,6 +96,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.types.NumberCodecType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -3047,20 +3048,25 @@ public class TestFromClientSide { } static void assertIncrementKey(Cell key, byte [] row, byte [] family, - byte [] qualifier, long value) - throws Exception { + byte [] qualifier, long value) throws Exception { + assertIncrementKey(key, row, family, qualifier, value, NumberCodecType.RAW_LONG); + } + + static void assertIncrementKey(Cell key, byte [] row, byte [] family, + byte [] qualifier, long value, NumberCodecType type) + throws Exception { assertTrue("Expected row [" + Bytes.toString(row) + "] " + - "Got row [" + Bytes.toString(CellUtil.cloneRow(key)) +"]", + "Got row [" + Bytes.toString(CellUtil.cloneRow(key)) +"]", equals(row, CellUtil.cloneRow(key))); assertTrue("Expected family [" + Bytes.toString(family) + "] " + - "Got family [" + Bytes.toString(CellUtil.cloneFamily(key)) + "]", + "Got family [" + Bytes.toString(CellUtil.cloneFamily(key)) + "]", equals(family, CellUtil.cloneFamily(key))); assertTrue("Expected qualifier [" + Bytes.toString(qualifier) + "] " + - "Got qualifier [" + Bytes.toString(CellUtil.cloneQualifier(key)) + "]", + "Got qualifier [" + Bytes.toString(CellUtil.cloneQualifier(key)) + "]", equals(qualifier, CellUtil.cloneQualifier(key))); assertTrue("Expected value [" + value + "] " + - "Got value [" + Bytes.toLong(CellUtil.cloneValue(key)) + "]", - Bytes.toLong(CellUtil.cloneValue(key)) == value); + "Got value [" + type.decode(CellUtil.cloneValue(key)) + "]", + Bytes.compareTo(CellUtil.cloneValue(key), type.encode(value)) == 0); } private void assertNumKeys(Result result, int n) throws Exception { @@ -4405,6 +4411,59 @@ public class TestFromClientSide { } @Test + public void testIncrementWithTypes() throws Exception { + LOG.info("Starting testIncrementWithTypes"); + final TableName TABLENAME = TableName.valueOf("testIncrementWithTypes"); + HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILY); + + Increment inc = new Increment(ROW); + for (NumberCodecType type : NumberCodecType.values()) { + inc.addColumn(FAMILY, type.name().getBytes(), 1, type); + } + ht.increment(inc); + + // Verify expected results + Result r = ht.get(new Get(ROW)); + Cell [] kvs = r.rawCells(); + assertEquals(NumberCodecType.values().length, kvs.length); + for (int i = 0; i < kvs.length; ++i) { + byte[] q = CellUtil.cloneQualifier(kvs[i]); + NumberCodecType type = NumberCodecType.valueOf(Bytes.toString(q)); + assertIncrementKey(kvs[i], ROW, FAMILY, CellUtil.cloneQualifier(kvs[i]), 1, type); + } + + // inc by 0 + inc = new Increment(ROW); + for (NumberCodecType type : NumberCodecType.values()) { + inc.addColumn(FAMILY, type.name().getBytes(), 0, type); + } + ht.increment(inc); + r = ht.get(new Get(ROW)); + kvs = r.rawCells(); + assertEquals(NumberCodecType.values().length, kvs.length); + for (int i = 0; i < kvs.length; ++i) { + byte[] q = CellUtil.cloneQualifier(kvs[i]); + NumberCodecType type = NumberCodecType.valueOf(Bytes.toString(q)); + assertIncrementKey(kvs[i], ROW, FAMILY, CellUtil.cloneQualifier(kvs[i]), 1, type); + } + + // inc by -2 + inc = new Increment(ROW); + for (NumberCodecType type : NumberCodecType.values()) { + inc.addColumn(FAMILY, type.name().getBytes(), -2, type); + } + ht.increment(inc); + r = ht.get(new Get(ROW)); + kvs = r.rawCells(); + assertEquals(NumberCodecType.values().length, kvs.length); + for (int i = 0; i < kvs.length; ++i) { + byte[] q = CellUtil.cloneQualifier(kvs[i]); + NumberCodecType type = NumberCodecType.valueOf(Bytes.toString(q)); + assertIncrementKey(kvs[i], ROW, FAMILY, CellUtil.cloneQualifier(kvs[i]), -1, type); + } + } + + @Test public void testClientPoolRoundRobin() throws IOException { final TableName tableName = TableName.valueOf("testClientPoolRoundRobin");