diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9dae260..38a4dc3 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2645,13 +2645,10 @@ public class HRegion implements HConstants { if (value == null) { // Check the store (including disk) for the previous value. c = store.get(kv, 1); - if (c != null && c.size() == 1) { + if (c != null && c.size() >= 1) { // LOG.debug("Using HFile previous value for " + Bytes.toString(row) + // "/" + Bytes.toString(column)); value = c.get(0).getValue(); - } else if (c != null && c.size() > 1) { - throw new DoNotRetryIOException("more than 1 value returned in " + - "incrementColumnValue from Store"); } } diff --git a/src/java/org/apache/hadoop/hbase/thrift/Hbase.thrift b/src/java/org/apache/hadoop/hbase/thrift/Hbase.thrift index a702670..fa29635 100644 --- a/src/java/org/apache/hadoop/hbase/thrift/Hbase.thrift +++ b/src/java/org/apache/hadoop/hbase/thrift/Hbase.thrift @@ -376,7 +376,17 @@ service Hbase { * @param timestamp timestamp */ void mutateRowsTs(1:Text tableName, 2:list rowBatches, 3:i64 timestamp) - throws (1:IOError io, 2:IllegalArgument ia) + throws (1:IOError io, 2:IllegalArgument ia) + + /** + * Atomically increment the column value specified. Returns the next value post increment. + * @param tableName name of table + * @param row row to increment + * @param column name of column + * @param value amount to increment by + */ + i64 atomicIncrement(1:Text tableName, 2:Text row, 3:Text column, 4:i64 value) + throws (1:IOError io, 2:IllegalArgument ia) /** * Delete all cells that match the passed row and column. diff --git a/src/java/org/apache/hadoop/hbase/thrift/ThriftServer.java b/src/java/org/apache/hadoop/hbase/thrift/ThriftServer.java index 27f8770..55b5ec9 100644 --- a/src/java/org/apache/hadoop/hbase/thrift/ThriftServer.java +++ b/src/java/org/apache/hadoop/hbase/thrift/ThriftServer.java @@ -422,7 +422,17 @@ public class ThriftServer { throw new IllegalArgument(e.getMessage()); } } - + + public long atomicIncrement(byte[] tableName, byte[] row, byte[] column, long amount) throws IOError, IllegalArgument, TException { + HTable table; + try { + table = getTable(tableName); + return table.incrementColumnValue(row, column, amount); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + public void scannerClose(int id) throws IOError, IllegalArgument { LOG.debug("scannerClose: id=" + id); Scanner scanner = getScanner(id); diff --git a/src/java/org/apache/hadoop/hbase/thrift/generated/Hbase.java b/src/java/org/apache/hadoop/hbase/thrift/generated/Hbase.java index 556e877..29d7920 100644 --- a/src/java/org/apache/hadoop/hbase/thrift/generated/Hbase.java +++ b/src/java/org/apache/hadoop/hbase/thrift/generated/Hbase.java @@ -234,6 +234,8 @@ public class Hbase { */ public void mutateRowsTs(byte[] tableName, List rowBatches, long timestamp) throws IOError, IllegalArgument, TException; + public long atomicIncrement(byte[] tableName, byte[] row, byte[] column, long value) throws IOError, IllegalArgument, TException; + /** * Delete all cells that match the passed row and column. * @@ -1185,6 +1187,49 @@ public class Hbase { return; } + public long atomicIncrement(byte[] tableName, byte[] row, byte[] column, long value) throws IOError, IllegalArgument, TException + { + send_atomicIncrement(tableName, row, column, value); + return recv_atomicIncrement(); + } + + public void send_atomicIncrement(byte[] tableName, byte[] row, byte[] column, long value) throws TException + { + oprot_.writeMessageBegin(new TMessage("atomicIncrement", TMessageType.CALL, seqid_)); + atomicIncrement_args args = new atomicIncrement_args(); + args.tableName = tableName; + args.row = row; + args.column = column; + args.value = value; + args.write(oprot_); + oprot_.writeMessageEnd(); + oprot_.getTransport().flush(); + } + + public long recv_atomicIncrement() throws IOError, IllegalArgument, TException + { + TMessage msg = iprot_.readMessageBegin(); + if (msg.type == TMessageType.EXCEPTION) { + TApplicationException x = TApplicationException.read(iprot_); + iprot_.readMessageEnd(); + throw x; + } + atomicIncrement_result result = new atomicIncrement_result(); + result.read(iprot_); + iprot_.readMessageEnd(); + if (result.__isset.success) { + return result.success; + } + if (result.__isset.io) { + throw result.io; + } + if (result.__isset.ia) { + throw result.ia; + } + throw new TApplicationException(TApplicationException.MISSING_RESULT, "atomicIncrement failed: unknown result"); + } + + public void deleteAll(byte[] tableName, byte[] row, byte[] column) throws IOError, TException { send_deleteAll(tableName, row, column); @@ -1585,6 +1630,7 @@ public class Hbase { processMap_.put("mutateRowTs", new mutateRowTs()); processMap_.put("mutateRows", new mutateRows()); processMap_.put("mutateRowsTs", new mutateRowsTs()); + processMap_.put("atomicIncrement", new atomicIncrement()); processMap_.put("deleteAll", new deleteAll()); processMap_.put("deleteAllTs", new deleteAllTs()); processMap_.put("deleteAllRow", new deleteAllRow()); @@ -2116,6 +2162,32 @@ public class Hbase { } + private class atomicIncrement implements ProcessFunction { + public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException + { + atomicIncrement_args args = new atomicIncrement_args(); + args.read(iprot); + iprot.readMessageEnd(); + atomicIncrement_result result = new atomicIncrement_result(); + try { + result.success = iface_.atomicIncrement(args.tableName, args.row, args.column, args.value); + result.__isset.success = true; + } catch (IOError io) { + result.io = io; + result.__isset.io = true; + } catch (IllegalArgument ia) { + result.ia = ia; + result.__isset.ia = true; + } + oprot.writeMessageBegin(new TMessage("atomicIncrement", TMessageType.REPLY, seqid)); + result.write(oprot); + oprot.writeMessageEnd(); + oprot.getTransport().flush(); + } + + } + + private class deleteAll implements ProcessFunction { public void process(int seqid, TProtocol iprot, TProtocol oprot) throws TException { @@ -8486,9 +8558,379 @@ public class Hbase { sb.append(")"); return sb.toString(); } + } + + public static class atomicIncrement_args implements TBase, java.io.Serializable { + public byte[] tableName; + public byte[] row; + public byte[] column; + public long value; + + public final Isset __isset = new Isset(); + public static final class Isset implements java.io.Serializable { + public boolean tableName = false; + public boolean row = false; + public boolean column = false; + public boolean value = false; + + } + + public atomicIncrement_args() { + } + + public atomicIncrement_args( + byte[] tableName, + byte[] row, + byte[] column, + long value) + { + this(); + this.tableName = tableName; + this.__isset.tableName = true; + this.row = row; + this.__isset.row = true; + this.column = column; + this.__isset.column = true; + this.value = value; + this.__isset.value = true; + } + + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof atomicIncrement_args) + return this.equals((atomicIncrement_args)that); + return false; + } + + public boolean equals(atomicIncrement_args that) { + if (that == null) + return false; + + boolean this_present_tableName = true && (this.tableName != null); + boolean that_present_tableName = true && (that.tableName != null); + if (this_present_tableName || that_present_tableName) { + if (!(this_present_tableName && that_present_tableName)) + return false; + if (!java.util.Arrays.equals(this.tableName, that.tableName)) + return false; + } + + boolean this_present_row = true && (this.row != null); + boolean that_present_row = true && (that.row != null); + if (this_present_row || that_present_row) { + if (!(this_present_row && that_present_row)) + return false; + if (!java.util.Arrays.equals(this.row, that.row)) + return false; + } + + + boolean this_present_column = true && (this.column != null); + boolean that_present_column = true && (that.column != null); + if (this_present_column || that_present_column) { + if (!(this_present_column && that_present_column)) + return false; + if (!java.util.Arrays.equals(this.column, that.column)) + return false; + } + + boolean this_present_value = true; + boolean that_present_value = true; + if (this_present_value || that_present_value) { + if (!(this_present_value && that_present_value)) + return false; + if (this.value != that.value) + return false; + } + + return true; + } + + public int hashCode() { + return 0; + } + + public void read(TProtocol iprot) throws TException { + TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == TType.STOP) { + break; + } + switch (field.id) + { + case 1: + if (field.type == TType.STRING) { + this.tableName = iprot.readBinary(); + this.__isset.tableName = true; + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + case 2: + if (field.type == TType.STRING) { + this.row = iprot.readBinary(); + this.__isset.row = true; + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + case 3: + if (field.type == TType.STRING) { + this.column = iprot.readBinary(); + this.__isset.column = true; + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + case 4: + if (field.type == TType.I64) { + this.value = iprot.readI64(); + this.__isset.value = true; + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + default: + TProtocolUtil.skip(iprot, field.type); + break; + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + } + + public void write(TProtocol oprot) throws TException { + TStruct struct = new TStruct("atomicIncrement_args"); + oprot.writeStructBegin(struct); + TField field = new TField(); + if (this.tableName != null) { + field.name = "tableName"; + field.type = TType.STRING; + field.id = 1; + oprot.writeFieldBegin(field); + oprot.writeBinary(this.tableName); + oprot.writeFieldEnd(); + } + if (this.row != null) { + field.name = "row"; + field.type = TType.STRING; + field.id = 2; + oprot.writeFieldBegin(field); + oprot.writeBinary(this.row); + oprot.writeFieldEnd(); + } + if (this.column != null) { + field.name = "column"; + field.type = TType.STRING; + field.id = 3; + oprot.writeFieldBegin(field); + oprot.writeBinary(this.column); + oprot.writeFieldEnd(); + } + field.name = "value"; + field.type = TType.I64; + field.id = 4; + oprot.writeFieldBegin(field); + oprot.writeI64(this.value); + oprot.writeFieldEnd(); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + public String toString() { + StringBuilder sb = new StringBuilder("atomicIncrement_args("); + sb.append("tableName:"); + sb.append(this.tableName); + sb.append(",row:"); + sb.append(this.row); + sb.append(",column:"); + sb.append(this.column); + sb.append(",value:"); + sb.append(this.value); + sb.append(")"); + return sb.toString(); + } + + } + + public static class atomicIncrement_result implements TBase, java.io.Serializable { + public long success; + public IOError io; + public IllegalArgument ia; + + public final Isset __isset = new Isset(); + public static final class Isset implements java.io.Serializable { + public boolean success = false; + public boolean io = false; + public boolean ia = false; + } + + public atomicIncrement_result() { + } + + public atomicIncrement_result( + long success, + IOError io, + IllegalArgument ia) + { + this(); + this.success = success; + this.__isset.success = true; + this.io = io; + this.__isset.io = true; + this.ia = ia; + this.__isset.ia = true; + } + + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof atomicIncrement_result) + return this.equals((atomicIncrement_result)that); + return false; + } + + public boolean equals(atomicIncrement_result that) { + if (that == null) + return false; + + boolean this_present_success = true; + boolean that_present_success = true; + if (this_present_success || that_present_success) { + if (!(this_present_success && that_present_success)) + return false; + if (this.success != that.success) + return false; + } + + boolean this_present_io = true && (this.io != null); + boolean that_present_io = true && (that.io != null); + if (this_present_io || that_present_io) { + if (!(this_present_io && that_present_io)) + return false; + if (!this.io.equals(that.io)) + return false; + } + + boolean this_present_ia = true && (this.ia != null); + boolean that_present_ia = true && (that.ia != null); + if (this_present_ia || that_present_ia) { + if (!(this_present_ia && that_present_ia)) + return false; + if (!this.ia.equals(that.ia)) + return false; + } + + return true; + } + + public int hashCode() { + return 0; + } + + public void read(TProtocol iprot) throws TException { + TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == TType.STOP) { + break; + } + switch (field.id) + { + case 0: + if (field.type == TType.I64) { + this.success = iprot.readI64(); + this.__isset.success = true; + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + case 1: + if (field.type == TType.STRUCT) { + this.io = new IOError(); + this.io.read(iprot); + this.__isset.io = true; + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + case 2: + if (field.type == TType.STRUCT) { + this.ia = new IllegalArgument(); + this.ia.read(iprot); + this.__isset.ia = true; + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + default: + TProtocolUtil.skip(iprot, field.type); + break; + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + } + + public void write(TProtocol oprot) throws TException { + TStruct struct = new TStruct("atomicIncrement_result"); + oprot.writeStructBegin(struct); + TField field = new TField(); + + if (this.__isset.success) { + field.name = "success"; + field.type = TType.I64; + field.id = 0; + oprot.writeFieldBegin(field); + oprot.writeI64(this.success); + oprot.writeFieldEnd(); + } else if (this.__isset.io) { + if (this.io != null) { + field.name = "io"; + field.type = TType.STRUCT; + field.id = 1; + oprot.writeFieldBegin(field); + this.io.write(oprot); + oprot.writeFieldEnd(); + } + } else if (this.__isset.ia) { + if (this.ia != null) { + field.name = "ia"; + field.type = TType.STRUCT; + field.id = 2; + oprot.writeFieldBegin(field); + this.ia.write(oprot); + oprot.writeFieldEnd(); + } + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + public String toString() { + StringBuilder sb = new StringBuilder("atomicIncrement_result("); + sb.append("success:"); + sb.append(this.success); + sb.append(",io:"); + sb.append(this.io.toString()); + sb.append(",ia:"); + sb.append(this.ia.toString()); + + + sb.append(")"); + return sb.toString(); + } } + + public static class deleteAll_args implements TBase, java.io.Serializable { public byte[] tableName; public byte[] row;