Index: src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteColumnsByPrefix.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteColumnsByPrefix.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestDeleteColumnsByPrefix.java (revision 0) @@ -0,0 +1,342 @@ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestDeleteColumnsByPrefix { + + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private final static String FAMILY_STR = "family"; + private final static byte[] FAMILY = Bytes.toBytes(FAMILY_STR); + private final static byte[] ROW = Bytes.toBytes("row"); + + @Test + public void testBasicCases() throws IOException { + HTableDescriptor htd = new HTableDescriptor("TestDeleteColumnsByPrefix"); + htd.addFamily(new HColumnDescriptor(FAMILY_STR)); + HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); + HRegion region = HRegion.createHRegion(info, TEST_UTIL. + getDataTestDir(), TEST_UTIL.getConfiguration(), htd); + + long T = EnvironmentEdgeManager.currentTimeMillis(); + Put p = new Put(ROW, T); + p.add(FAMILY, Bytes.toBytes("12"), ROW); + p.add(FAMILY, Bytes.toBytes("123"), ROW); + p.add(FAMILY, Bytes.toBytes("1234"), ROW); + p.add(FAMILY, Bytes.toBytes("12345"), ROW); + p.add(FAMILY, Bytes.toBytes("125"), ROW); + p.add(FAMILY, Bytes.toBytes("126"), ROW); + region.put(p); + + Delete d = new Delete(ROW); + d.deleteColumnsByPrefix(FAMILY, Bytes.toBytes("123"), T+1); + region.delete(d, null, false); + Scan s = new Scan(); + RegionScanner scanner = region.getScanner(s); + List kvs = new ArrayList(); + scanner.next(kvs); + assertEquals(3, kvs.size()); + // 123 is not a prefix of 12, 125, or 126, so these kvs is not affected + assertArrayEquals(Bytes.toBytes("12"), kvs.get(0).getQualifier()); + assertArrayEquals(Bytes.toBytes("125"), kvs.get(1).getQualifier()); + assertArrayEquals(Bytes.toBytes("126"), kvs.get(2).getQualifier()); + scanner.close(); + + p = new Put(ROW, T+2); + p.add(FAMILY, Bytes.toBytes("123"), ROW); + region.put(p); + + s = new Scan(); + scanner = region.getScanner(s); + kvs = new ArrayList(); + scanner.next(kvs); + assertEquals(4, kvs.size()); + // new kv, not affected + assertArrayEquals(Bytes.toBytes("12"), kvs.get(0).getQualifier()); + assertArrayEquals(Bytes.toBytes("123"), kvs.get(1).getQualifier()); + assertArrayEquals(Bytes.toBytes("125"), kvs.get(2).getQualifier()); + assertArrayEquals(Bytes.toBytes("126"), kvs.get(3).getQualifier()); + scanner.close(); + + // flush + region.flushcache(); + scanner = region.getScanner(s); + kvs = new ArrayList(); + scanner.next(kvs); + assertEquals(4, kvs.size()); + scanner.close(); + + // compact + region.compactStores(false); + scanner = region.getScanner(s); + kvs = new ArrayList(); + scanner.next(kvs); + assertEquals(4, kvs.size()); + scanner.close(); + + Scan s1 = new Scan(); + s1.setRaw(true); + scanner = region.getScanner(s1); + kvs = new ArrayList(); + scanner.next(kvs); + assertEquals(5, kvs.size()); + assertTrue(kvs.get(0).isDelete()); + assertArrayEquals(Bytes.toBytes("12"), kvs.get(1).getQualifier()); + assertArrayEquals(Bytes.toBytes("123"), kvs.get(2).getQualifier()); + assertArrayEquals(Bytes.toBytes("125"), kvs.get(3).getQualifier()); + assertArrayEquals(Bytes.toBytes("126"), kvs.get(4).getQualifier()); + scanner.close(); + + region.compactStores(true); + scanner = region.getScanner(s1); + kvs = new ArrayList(); + scanner.next(kvs); + assertEquals(4, kvs.size()); + scanner.close(); + + region.close(); + region.getLog().closeAndDelete(); + } + + @Test + public void testMultiplePrefixMarkers() throws IOException { + HTableDescriptor htd = new HTableDescriptor("TestDeleteColumnsByPrefix"); + htd.addFamily(new HColumnDescriptor(FAMILY_STR)); + HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); + HRegion region = HRegion.createHRegion(info, TEST_UTIL. + getDataTestDir(), TEST_UTIL.getConfiguration(), htd); + + long T = EnvironmentEdgeManager.currentTimeMillis(); + Put p = new Put(ROW, T); + p.add(FAMILY, Bytes.toBytes("12"), ROW); + p.add(FAMILY, Bytes.toBytes("123"), ROW); + p.add(FAMILY, Bytes.toBytes("1234"), ROW); + region.put(p); + + Delete d = new Delete(ROW); + d.deleteColumnsByPrefix(FAMILY, Bytes.toBytes("123"), T+1); + region.delete(d, null, false); + + // place an earlier marker + d = new Delete(ROW); + d.deleteColumnsByPrefix(FAMILY, Bytes.toBytes("123"), T-1); + region.delete(d, null, false); + + d = new Delete(ROW); + d.deleteColumnsByPrefix(FAMILY, Bytes.toBytes("1234"), T-1); + region.delete(d, null, false); + + Scan s = new Scan(); + RegionScanner scanner = region.getScanner(s); + List kvs = new ArrayList(); + scanner.next(kvs); + assertEquals(1, kvs.size()); + + p = new Put(ROW, T+2); + p.add(FAMILY, Bytes.toBytes("1234"), ROW); + region.put(p); + d = new Delete(ROW); + d.deleteColumnsByPrefix(FAMILY, Bytes.toBytes("1234"), T+3); + region.delete(d, null, false); + s = new Scan(); + scanner = region.getScanner(s); + kvs = new ArrayList(); + scanner.next(kvs); + assertEquals(1, kvs.size()); + + region.close(); + region.getLog().closeAndDelete(); + } + + /* + * Note: This is test show a bug in the code. + */ + @Test + public void testGet() throws IOException { + HTableDescriptor htd = new HTableDescriptor("TestDeleteColumnsByPrefix"); + htd.addFamily(new HColumnDescriptor(FAMILY_STR)); + HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); + HRegion region = HRegion.createHRegion(info, TEST_UTIL. + getDataTestDir(), TEST_UTIL.getConfiguration(), htd); + + long T = EnvironmentEdgeManager.currentTimeMillis(); + Put p1 = new Put(Bytes.toBytes("ro")); + p1.add(FAMILY, Bytes.toBytes(String.format("122%04d", 0)), ROW); + region.put(p1); + + Put p = new Put(ROW, T); + for (int i=0; i<10; i++) { + p.add(FAMILY, Bytes.toBytes(String.format("122%04d", i)), ROW); + } + region.put(p); + + p = new Put(ROW, T); + for (int i=0; i<100000; i++) { + p.add(FAMILY, Bytes.toBytes(String.format("123%04d", i)), ROW); + } + region.put(p); + + Delete d = new Delete(ROW); + d.deleteColumnsByPrefix(FAMILY, Bytes.toBytes("123"), T+1); + region.delete(d, null, false); + + Get g = new Get(ROW); + g.addColumn(FAMILY, Bytes.toBytes("12390000")); + Result r = region.get(g, null); + assertTrue(r.isEmpty()); + + region.close(); + region.getLog().closeAndDelete(); + } + + @Test + public void testColumnPrefixes() throws IOException { + HTableDescriptor htd = new HTableDescriptor("TestDeleteColumnsByPrefix"); + htd.addFamily(new HColumnDescriptor(FAMILY_STR)); + HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); + HRegion region = HRegion.createHRegion(info, TEST_UTIL. + getDataTestDir(), TEST_UTIL.getConfiguration(), htd); + + long T = EnvironmentEdgeManager.currentTimeMillis(); + Put p = new Put(ROW, T); + p.add(FAMILY, Bytes.toBytes("12"), ROW); + p.add(FAMILY, Bytes.toBytes("12345"), ROW); + p.add(FAMILY, Bytes.toBytes("16"), ROW); // a shorter qualifier + region.put(p); + + Delete d = new Delete(ROW); + d.deleteColumnsByPrefix(FAMILY, Bytes.toBytes("123"), T+1); + region.delete(d, null, false); + + Scan s = new Scan(); + RegionScanner scanner = region.getScanner(s); + List kvs = new ArrayList(); + scanner.next(kvs); + assertEquals(2, kvs.size()); + assertArrayEquals(Bytes.toBytes("12"), kvs.get(0).getQualifier()); + assertArrayEquals(Bytes.toBytes("16"), kvs.get(1).getQualifier()); + + region.close(); + region.getLog().closeAndDelete(); + } + + @Test + public void testWithFamilyMarker() throws IOException { + HTableDescriptor htd = new HTableDescriptor("TestDeleteColumnsByPrefix"); + htd.addFamily(new HColumnDescriptor(FAMILY_STR)); + HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); + HRegion region = HRegion.createHRegion(info, TEST_UTIL. + getDataTestDir(), TEST_UTIL.getConfiguration(), htd); + + long T = EnvironmentEdgeManager.currentTimeMillis(); + Put p = new Put(ROW, T); + p.add(FAMILY, Bytes.toBytes("12"), ROW); + p.add(FAMILY, Bytes.toBytes("123"), ROW); + p.add(FAMILY, Bytes.toBytes("1234"), ROW); + region.put(p); + + p = new Put(ROW, T+1); + p.add(FAMILY, Bytes.toBytes("12345"), ROW); + p.add(FAMILY, Bytes.toBytes("12346"), ROW); + p.add(FAMILY, Bytes.toBytes("125"), ROW); + region.put(p); + + Delete d = new Delete(ROW); + d.deleteFamily(FAMILY, T); + region.delete(d, null, false); + + d = new Delete(ROW); + d.deleteColumnsByPrefix(FAMILY, Bytes.toBytes("123"), T+1); + region.delete(d, null, false); + + Scan s = new Scan(); + RegionScanner scanner = region.getScanner(s); + List kvs = new ArrayList(); + scanner.next(kvs); + assertEquals(1, kvs.size()); + assertArrayEquals(Bytes.toBytes("125"), kvs.get(0).getQualifier()); + } + + @Test + public void testMixedMarkers() throws IOException { + HTableDescriptor htd = new HTableDescriptor("TestDeleteColumnsByPrefix"); + htd.addFamily(new HColumnDescriptor(FAMILY_STR)); + HRegionInfo info = new HRegionInfo(htd.getName(), null, null, false); + HRegion region = HRegion.createHRegion(info, TEST_UTIL. + getDataTestDir(), TEST_UTIL.getConfiguration(), htd); + + long T = EnvironmentEdgeManager.currentTimeMillis(); + Put p = new Put(ROW, T); + p.add(FAMILY, Bytes.toBytes("12"), ROW); + p.add(FAMILY, Bytes.toBytes("123"), ROW); + p.add(FAMILY, Bytes.toBytes("1234"), ROW); + p.add(FAMILY, Bytes.toBytes("12345"), ROW); + p.add(FAMILY, Bytes.toBytes("125"), ROW); + region.put(p); + + Delete d = new Delete(ROW); + d.deleteColumnsByPrefix(FAMILY, Bytes.toBytes("123"), T+1); + region.delete(d, null, false); + + // a new version of column "123" + p = new Put(ROW, T+2); + p.add(FAMILY, Bytes.toBytes("123"), ROW); + region.put(p); + + // now delete it with a column marker + d = new Delete(ROW); + d.deleteColumns(FAMILY, Bytes.toBytes("123"), T+3); + d.deleteColumns(FAMILY, Bytes.toBytes("12"), T+3); + region.delete(d, null, false); + + Scan s = new Scan(); + s.setRaw(true); + RegionScanner scanner = region.getScanner(s); + List kvs = new ArrayList(); + scanner.next(kvs); + scanner.next(kvs); + assertEquals(8, kvs.size()); + assertTrue(kvs.get(0).isDelete()); // delete prefix for 123 + assertTrue(kvs.get(1).isDelete()); // delete for 12 + assertArrayEquals(Bytes.toBytes("12"), kvs.get(2).getQualifier()); + assertTrue(kvs.get(3).isDelete()); // delete for 123 + assertArrayEquals(Bytes.toBytes("123"), kvs.get(4).getQualifier()); + assertArrayEquals(Bytes.toBytes("1234"), kvs.get(5).getQualifier()); + assertArrayEquals(Bytes.toBytes("12345"), kvs.get(6).getQualifier()); + assertArrayEquals(Bytes.toBytes("125"), kvs.get(7).getQualifier()); + scanner.close(); + + s = new Scan(); + scanner = region.getScanner(s); + kvs = new ArrayList(); + scanner.next(kvs); + assertEquals(1, kvs.size()); + assertArrayEquals(Bytes.toBytes("125"), kvs.get(0).getQualifier()); + scanner.close(); + + region.close(); + region.getLog().closeAndDelete(); + } +} Index: src/main/java/org/apache/hadoop/hbase/client/Delete.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/Delete.java (revision 1235635) +++ src/main/java/org/apache/hadoop/hbase/client/Delete.java (working copy) @@ -202,6 +202,39 @@ } /** + * Delete all versions of all columns that match the specified qualifier + * prefix. + * @param family family name + * @param qualifier column qualifier prefix to match + * @return this for invocation chaining + */ + public Delete deleteColumnsByPrefix(byte[] family, byte[] qualifier) { + this.deleteColumnsByPrefix(family, qualifier, + HConstants.LATEST_TIMESTAMP); + return this; + } + + /** + * Delete all versions of all columns that match the specified qualifier prefix + * with a timestamp less than r equal to the specified timestamp. + * @param family family name + * @param qualifier column qualifier prefix to match + * @param timestamp maximum version timestamp + * @return this for invocation chaining + */ + public Delete deleteColumnsByPrefix(byte[] family, byte[] qualifier, + long timestamp) { + List list = familyMap.get(family); + if (list == null) { + list = new ArrayList(); + } + list.add(new KeyValue(this.row, family, qualifier, timestamp, + KeyValue.Type.DeleteColumnPrefix)); + familyMap.put(family, list); + return this; + } + + /** * Delete all versions of the specified column. * @param family family name * @param qualifier column qualifier Index: src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java (revision 1235635) +++ src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java (working copy) @@ -20,8 +20,14 @@ package org.apache.hadoop.hbase.regionserver; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult; import org.apache.hadoop.hbase.util.Bytes; /** @@ -47,6 +53,9 @@ private int deleteLength = 0; private byte deleteType = 0; private long deleteTimestamp = 0L; + // TODO: Use a better data structure here. + private Map prefixes = new TreeMap( + Bytes.BYTES_COMPARATOR); /** * Constructor for ScanDeleteTracker @@ -75,6 +84,15 @@ return; } + if (type == KeyValue.Type.DeleteColumnPrefix.getCode()) { + // TODO: This is inefficient + byte[] prefix = Arrays.copyOfRange(buffer, qualifierOffset, qualifierOffset+qualifierLength); + Long previousTS = prefixes.get(prefix); + if (previousTS == null || previousTS < timestamp) { + prefixes.put(prefix, timestamp); + } + return; + } if (deleteBuffer != null && type < deleteType) { // same column, so ignore less specific delete if (Bytes.equals(deleteBuffer, deleteOffset, deleteLength, @@ -109,6 +127,22 @@ return DeleteResult.FAMILY_DELETED; } + for (Iterator> i = prefixes.entrySet().iterator(); i.hasNext();) { + Map.Entry entry = i.next(); + byte[] prefix = entry.getKey(); + long deleteTS = entry.getValue(); + int ret = Bytes.compareTo(prefix, 0, prefix.length, buffer, + qualifierOffset, Math.min(prefix.length, qualifierLength)); + if (ret < 0) { + // we scanned past the prefix, safe to remove + i.remove(); + } else if (ret == 0 && prefix.length <= qualifierLength) { + if (timestamp <= deleteTS) { + return DeleteResult.COLUMN_DELETED; + } + } + } + if (deleteBuffer != null) { int ret = Bytes.compareTo(deleteBuffer, deleteOffset, deleteLength, buffer, qualifierOffset, qualifierLength); @@ -144,7 +178,7 @@ @Override public boolean isEmpty() { - return deleteBuffer == null && familyStamp == 0; + return deleteBuffer == null && familyStamp == 0 && prefixes.isEmpty(); } @Override @@ -152,6 +186,7 @@ public void reset() { familyStamp = 0L; deleteBuffer = null; + prefixes.clear(); } @Override Index: src/main/java/org/apache/hadoop/hbase/KeyValue.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/KeyValue.java (revision 1235635) +++ src/main/java/org/apache/hadoop/hbase/KeyValue.java (working copy) @@ -161,6 +161,7 @@ Delete((byte)8), DeleteColumn((byte)12), + DeleteColumnPrefix((byte)13), DeleteFamily((byte)14), // Maximum is used when searching; you look from maximum on down. @@ -2049,22 +2050,24 @@ } // TODO the family and qualifier should be compared separately - compare = Bytes.compareTo(left, lcolumnoffset, lcolumnlength, right, - rcolumnoffset, rcolumnlength); - if (compare != 0) { - return compare; - } - - if (!this.ignoreTimestamp) { - // Get timestamps. - long ltimestamp = Bytes.toLong(left, - loffset + (llength - TIMESTAMP_TYPE_SIZE)); - long rtimestamp = Bytes.toLong(right, - roffset + (rlength - TIMESTAMP_TYPE_SIZE)); - compare = compareTimestamps(ltimestamp, rtimestamp); + if (ltype == Type.DeleteColumnPrefix.getCode() && rtype == Type.DeleteColumnPrefix.getCode() || + ltype != Type.DeleteColumnPrefix.getCode() && rtype != Type.DeleteColumnPrefix.getCode()) { + compare = Bytes.compareTo(left, lcolumnoffset, lcolumnlength, right, + rcolumnoffset, rcolumnlength); if (compare != 0) { return compare; } + if (!this.ignoreTimestamp) { + // Get timestamps. + long ltimestamp = Bytes.toLong(left, loffset + + (llength - TIMESTAMP_TYPE_SIZE)); + long rtimestamp = Bytes.toLong(right, roffset + + (rlength - TIMESTAMP_TYPE_SIZE)); + compare = compareTimestamps(ltimestamp, rtimestamp); + if (compare != 0) { + return compare; + } + } } if (!this.ignoreType) {