diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 51a95e4..a2bba45 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; @@ -773,6 +774,19 @@ public class HTable implements HTableInterface { final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Put put) throws IOException { + + return checkAndPut(row, family, qualifier, new BinaryComparator(), compareOp, value, put); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean checkAndPut(final byte [] row, final byte [] family, + final byte [] qualifier, final ByteArrayComparable comparator, + final CompareOp compareOp, final byte [] value, + final Put put) throws IOException { + RegionServerCallable callable = new RegionServerCallable(connection, getName(), row) { @Override @@ -780,11 +794,12 @@ public class HTable implements HTableInterface { PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); + comparator.setValue(value); try { CompareType compareType = CompareType.valueOf(compareOp.name()); MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, put); + comparator, compareType, put); MutateResponse response = getStub().mutate(controller, request); return Boolean.valueOf(response.getProcessed()); } catch (ServiceException se) { @@ -794,7 +809,7 @@ public class HTable implements HTableInterface { }; return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); } - + /** * {@inheritDoc} */ @@ -829,29 +844,41 @@ public class HTable implements HTableInterface { */ @Override public boolean checkAndDelete(final byte [] row, final byte [] family, + final byte [] qualifier, final ByteArrayComparable comparator, final CompareOp compareOp, + final byte [] value, final Delete delete) throws IOException { + RegionServerCallable callable = + new RegionServerCallable(connection, getName(), row) { + @Override + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + comparator.setValue(value); + try { + CompareType compareType = CompareType.valueOf(compareOp.name()); + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + comparator, compareType, delete); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; + return rpcCallerFactory. newCaller(). + callWithRetries(callable,this.operationTimeout); + + } + /** + * {@inheritDoc} + */ + @Override + public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Delete delete) throws IOException { - RegionServerCallable callable = - new RegionServerCallable(connection, getName(), row) { - @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - CompareType compareType = CompareType.valueOf(compareOp.name()); - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, delete); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return checkAndDelete(row, family, qualifier, new BinaryComparator(), compareOp, value, delete); } /** @@ -861,6 +888,18 @@ public class HTable implements HTableInterface { public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException { + + return checkAndMutate(row, family, qualifier, new BinaryComparator(), compareOp, value, rm); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, + final ByteArrayComparable comparator, final CompareOp compareOp, + final byte [] value, final RowMutations rm) throws IOException { + RegionServerCallable callable = new RegionServerCallable(connection, getName(), row) { @Override @@ -868,11 +907,12 @@ public class HTable implements HTableInterface { PayloadCarryingRpcController controller = rpcControllerFactory.newController(); controller.setPriority(tableName); controller.setCallTimeout(callTimeout); + comparator.setValue(value); try { CompareType compareType = CompareType.valueOf(compareOp.name()); MultiRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, rm); + comparator, compareType, rm); ClientProtos.MultiResponse response = getStub().multi(controller, request); ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); if (res.hasException()) { @@ -890,8 +930,8 @@ public class HTable implements HTableInterface { } }; return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); - } - + } + /** * {@inheritDoc} */ diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 3e9db00..523626e 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; @@ -246,6 +247,23 @@ public interface Table extends Closeable { CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException; /** + * Atomically checks if a row/family/qualifier value matches the expected + * value, using custom comparator. If it does, it adds the put. + * If the passed value is null, the chec is for the lack of column (ie: non-existance) + * + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param comparator custom comparator + * @param compareOp comparison operator to use + * @param value the expected value + * @param put data to put if check succeeds + * @throws IOException e + * @return true if the new put was executed, false otherwise + */ + boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, ByteArrayComparable comparator, + CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException; + /** * Deletes the specified cells/row. * * @param delete The object that specifies what to delete. @@ -301,6 +319,25 @@ public interface Table extends Closeable { CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException; /** + * Atomically checks if a row/family/qualifier value matches the expected + * value, using custom comparator. If it does, it adds the delete. + * If the passed value is null, the check is for the lack of column (ie: non-existance) + * + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param comparator custom comparator + * @param compareOp comparison operator to use + * @param value the expected value + * @param delete data to delete if check succeeds + * @throws IOException e + * @return true if the new delete was executed, false otherwise + */ + boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + ByteArrayComparable comparator, CompareFilter.CompareOp compareOp, + byte[] value, Delete delete) throws IOException; + + /** * Performs multiple mutations atomically on a single row. Currently * {@link Put} and {@link Delete} are supported. * @@ -566,4 +603,23 @@ public interface Table extends Closeable { */ boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException; + + /** + * Atomically checks if a row/family/qualifier value matches the expected value, using + * custom comparator. If it does, it performs the row mutations. If the passed value + * is null, the check is for the lack of column (ie: non-existence) + * + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param comparator custom comparator + * @param compareOp the comparison operator + * @param value the expected value + * @param mutation mutations to perform if check succeeds + * @throws IOException e + * @return true if the new put was executed, false otherwise + */ + boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + ByteArrayComparable comparator, CompareFilter.CompareOp compareOp, + byte[] value, RowMutations mutation) throws IOException; } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryComparator.java hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryComparator.java index 0f2ed2b..8c7825e 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryComparator.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryComparator.java @@ -46,6 +46,13 @@ public class BinaryComparator extends ByteArrayComparable { super(value); } + /** + * Default ctor + */ + public BinaryComparator() { + super(); + } + @Override public int compareTo(byte [] value, int offset, int length) { return Bytes.compareTo(this.value, 0, this.value.length, value, offset, length); diff --git hbase-common/src/main/java/org/apache/hadoop/hbase/filter/ByteArrayComparable.java hbase-common/src/main/java/org/apache/hadoop/hbase/filter/ByteArrayComparable.java index 99f31b1..e271087 100644 --- hbase-common/src/main/java/org/apache/hadoop/hbase/filter/ByteArrayComparable.java +++ hbase-common/src/main/java/org/apache/hadoop/hbase/filter/ByteArrayComparable.java @@ -48,6 +48,21 @@ public abstract class ByteArrayComparable implements Comparable { this.value = value; } + /** + * Default ctor + */ + public ByteArrayComparable(){ + } + + /** + * Sets value + * @param value + */ + + public void setValue(byte[] value) { + this.value = value; + } + public byte[] getValue() { return value; } diff --git hbase-resource-bundle/pom.xml hbase-resource-bundle/pom.xml index 2d3a04c..f8971ab 100644 --- hbase-resource-bundle/pom.xml +++ hbase-resource-bundle/pom.xml @@ -86,8 +86,7 @@ - META-INF/LICENSE.vm - META-INF/NOTICE.vm + diff --git hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index 172b763..36b4125 100644 --- hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; @@ -847,4 +848,25 @@ public class RemoteHTable implements Table { CompareOp compareOp, byte[] value, RowMutations rm) throws IOException { throw new UnsupportedOperationException("checkAndMutate not implemented"); } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + ByteArrayComparable comparator, CompareOp compareOp, byte[] value, Put put) + throws IOException { + throw new UnsupportedOperationException("checkAndPut with comparator not implemented"); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + ByteArrayComparable comparator, CompareOp compareOp, byte[] value, Delete delete) + throws IOException { + throw new UnsupportedOperationException("checkAndDelete with comparator not implemented"); + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + ByteArrayComparable comparator, CompareOp compareOp, byte[] value, RowMutations mutation) + throws IOException { + throw new UnsupportedOperationException("checkAndMutate with comparator not implemented"); + } } diff --git hbase-server/pom.xml hbase-server/pom.xml index 94f0b72..99aaa13 100644 --- hbase-server/pom.xml +++ hbase-server/pom.xml @@ -81,6 +81,7 @@ default +./src/main/resources false ${build.year} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java index 7865cc0..7d3a08f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost.Environment; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.io.MultipleIOException; @@ -294,4 +295,25 @@ public final class HTableWrapper implements Table { CompareOp compareOp, byte[] value, RowMutations rm) throws IOException { return table.checkAndMutate(row, family, qualifier, compareOp, value, rm); } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + ByteArrayComparable comparator, CompareOp compareOp, byte[] value, Put put) + throws IOException { + return table.checkAndPut(row, family, qualifier, comparator, compareOp, value, put); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + ByteArrayComparable comparator, CompareOp compareOp, byte[] value, Delete delete) + throws IOException { + return table.checkAndDelete(row, family, qualifier, comparator, compareOp, value, delete); + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + ByteArrayComparable comparator, CompareOp compareOp, byte[] value, RowMutations mutation) + throws IOException { + return checkAndMutate(row, family, qualifier, comparator, compareOp, value, mutation); + } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 8734aea..5f81642 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -65,6 +66,7 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; @@ -97,6 +99,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -4878,6 +4881,94 @@ public class TestFromClientSide { assertEquals(ok, true); } + public static class PrefixBinaryComparator extends BinaryComparator + { + int prefixSize = 2; + + @Override + public int compareTo(byte[] value, int offset, int length) { + return Bytes.compareTo(getValue(), 0, prefixSize, value, offset, prefixSize); + } + + @Override + public int compareTo(ByteBuffer value, int offset, int length) { + return -(ByteBufferUtils.compareTo(value, offset, prefixSize, + getValue(), 0, prefixSize)); + } + + @Override + public byte[] toByteArray() { + return getValue(); + } + + public static BinaryComparator parseFrom(final byte [] pbBytes) + throws DeserializationException + { + return new PrefixBinaryComparator(pbBytes); + } + + public PrefixBinaryComparator() + { + } + + public PrefixBinaryComparator(byte[] value) { + super(value); + } + + } + + @Test + public void testCheckAndDeleteWithCompareOpAndComparator() throws IOException { + final byte [] value1 = Bytes.toBytes("aaaa"); + final byte [] value2 = Bytes.toBytes("aabb"); + Table table = TEST_UTIL.createTable( + TableName.valueOf("testCheckAndDeleteWithCompareOpAndComparator"), FAMILY); + + Put put2 = new Put(ROW); + put2.addColumn(FAMILY, QUALIFIER, value2); + table.put(put2); + + Delete delete = new Delete(ROW); + delete.addColumns(FAMILY, QUALIFIER); + + // cell = "aabb", using "aacc" to compare only LESS/LESS_OR_EQUAL/NOT_EQUAL + // turns out "match" + PrefixBinaryComparator comparator = new PrefixBinaryComparator(); + + boolean ok = table.checkAndDelete(ROW, FAMILY, QUALIFIER, comparator, + CompareOp.LESS, value1, delete); + assertEquals(ok, false); + ok = table.checkAndDelete(ROW, FAMILY, QUALIFIER, comparator, CompareOp.EQUAL, value1, delete); + assertEquals(ok, true); + + } + + @Test + public void testCheckAndPutWithCompareOpAndComparator() throws IOException { + final byte [] value1 = Bytes.toBytes("aaaa"); + final byte [] value2 = Bytes.toBytes("aabb"); + Table table = TEST_UTIL.createTable( + TableName.valueOf("testCheckAndPutWithCompareOpAndComparator"), FAMILY); + + Put put2 = new Put(ROW); + put2.addColumn(FAMILY, QUALIFIER, value2); + table.put(put2); + + Put put1 = new Put(ROW); + put1.addColumn(FAMILY, QUALIFIER, value1); + + // cell = "aabb", using "aacc" to compare only LESS/LESS_OR_EQUAL/NOT_EQUAL + // turns out "match" + PrefixBinaryComparator comparator = new PrefixBinaryComparator(); + + boolean ok = table.checkAndPut(ROW, FAMILY, QUALIFIER, comparator, + CompareOp.LESS, value1, put1); + assertEquals(ok, false); + ok = table.checkAndPut(ROW, FAMILY, QUALIFIER, comparator, CompareOp.EQUAL, value1, put1); + assertEquals(ok, true); + + } + @Test public void testCheckAndDeleteWithCompareOp() throws IOException { final byte [] value1 = Bytes.toBytes("aaaa"); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java index f65bc5d..c652ec1 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; @@ -321,4 +322,25 @@ public class RegionAsTable implements Table { throws IOException { throw new UnsupportedOperationException(); } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + ByteArrayComparable comparator, CompareOp compareOp, byte[] value, Put put) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + ByteArrayComparable comparator, CompareOp compareOp, byte[] value, Delete delete) + throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + ByteArrayComparable comparator, CompareOp compareOp, byte[] value, RowMutations mutation) + throws IOException { + throw new UnsupportedOperationException(); + } } \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java index e556a58..0a6569f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestFIFOCompactionPolicy.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.Collection; import java.util.List; import java.util.Random; @@ -41,7 +40,6 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes;