Index: src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java (revision 0) @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor.example; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.OperationStatus; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +public class BulkDeleteEndpoint extends BaseEndpointCoprocessor implements BulkDeleteProtocol { + private static final String NO_OF_VERSIONS_TO_DELETE = "noOfVersionsToDelete"; + private static final Log LOG = LogFactory.getLog(BulkDeleteEndpoint.class); + + @Override + public BulkDeleteResponse delete(Scan scan, byte deleteType, Long timestamp, + int rowBatchSize) { + long totalRowsDeleted = 0L; + long totalVersionsDeleted = 0L; + BulkDeleteResponse response = new BulkDeleteResponse(); + HRegion region = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion(); + boolean hasMore = true; + RegionScanner scanner = null; + if (scan.getFilter() == null && deleteType == DeleteType.ROW) { + // What we need is just the rowkeys. So only 1st KV from any row is enough. + // Only when it is a row delete, we can apply this filter + // In other types we rely on the scan to know which all columns to be deleted. + scan.setFilter(new FirstKeyOnlyFilter()); + } + // When the delete is based on some conditions so that Filters are available in the scan, + // we assume that the scan is perfect having necessary column(s) only. + try { + scanner = region.getScanner(scan); + while (hasMore) { + List> deleteRows = new ArrayList>(rowBatchSize); + for (int i = 0; i < rowBatchSize; i++) { + List results = new ArrayList(); + hasMore = scanner.next(results); + if (results.size() > 0) { + deleteRows.add(results); + } + if (!hasMore) { + // There are no more rows. + break; + } + } + if (deleteRows.size() > 0) { + Pair[] deleteWithLockArr = new Pair[deleteRows.size()]; + int i = 0; + for (List deleteRow : deleteRows) { + Delete delete = createDeleteMutation(deleteRow, deleteType, timestamp); + deleteWithLockArr[i++] = new Pair(delete, null); + } + OperationStatus[] opStatus = region.batchMutate(deleteWithLockArr); + for (i = 0; i < opStatus.length; i++) { + if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { + break; + } + totalRowsDeleted++; + if (deleteType == DeleteType.VERSION) { + byte[] versionsDeleted = deleteWithLockArr[i].getFirst().getAttribute( + NO_OF_VERSIONS_TO_DELETE); + if (versionsDeleted != null) { + totalVersionsDeleted += Bytes.toInt(versionsDeleted); + } + } + } + } + } + } catch (IOException ioe) { + LOG.error(ioe); + response.setIoException(ioe); + } finally { + if (scanner != null) { + try { + scanner.close(); + } catch (IOException ioe) { + LOG.error(ioe); + } + } + } + response.setRowsDeleted(totalRowsDeleted); + response.setVersionsDeleted(totalVersionsDeleted); + return response; + } + + private Delete createDeleteMutation(List deleteRow, byte deleteType, Long timestamp) { + long ts; + if (timestamp == null) { + ts = HConstants.LATEST_TIMESTAMP; + } else { + ts = timestamp; + } + // We just need the rowkey. Get it from 1st KV. + byte[] row = deleteRow.get(0).getRow(); + Delete delete = new Delete(row, ts, null); + if (deleteType != DeleteType.ROW) { + switch (deleteType) { + case DeleteType.FAMILY: + Set families = new TreeSet(Bytes.BYTES_COMPARATOR); + for (KeyValue kv : deleteRow) { + if (families.add(kv.getFamily())) { + delete.deleteFamily(kv.getFamily(), ts); + } + } + break; + + case DeleteType.COLUMN: + Set columns = new HashSet(); + for (KeyValue kv : deleteRow) { + Column column = new Column(kv.getFamily(), kv.getQualifier()); + if (columns.add(column)) { + // Making deleteColumns() calls more than once for the same cf:qualifier is not correct + // Every call to deleteColumns() will add a new KV to the familymap which will finally + // get written to the memstore as part of delete(). + delete.deleteColumns(column.family, column.qualifier, ts); + } + } + break; + + case DeleteType.VERSION: + // When some timestamp was passed to the delete() call only one version of the column (with + // given timestamp) will be deleted. If no timestamp passed, it will delete N versions. + // How many versions will get deleted depends on the Scan being passed. All the KVs that + // the scan fetched will get deleted. + int noOfVersionsToDelete = 0; + if (timestamp == null) { + for (KeyValue kv : deleteRow) { + delete.deleteColumn(kv.getFamily(), kv.getQualifier(), kv.getTimestamp()); + noOfVersionsToDelete++; + } + } else { + columns = new HashSet(); + for (KeyValue kv : deleteRow) { + Column column = new Column(kv.getFamily(), kv.getQualifier()); + // Only one version of particular column getting deleted. + if (columns.add(column)) { + delete.deleteColumn(column.family, column.qualifier, ts); + noOfVersionsToDelete++; + } + } + } + delete.setAttribute(NO_OF_VERSIONS_TO_DELETE, Bytes.toBytes(noOfVersionsToDelete)); + } + } + return delete; + } + + private static class Column { + private byte[] family; + private byte[] qualifier; + + public Column(byte[] family, byte[] qualifier) { + this.family = family; + this.qualifier = qualifier; + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof Column)) { + return false; + } + Column column = (Column) other; + return Bytes.equals(this.family, column.family) + && Bytes.equals(this.qualifier, column.qualifier); + } + + @Override + public int hashCode() { + int h = 31; + h = h + 13 * Bytes.hashCode(this.family); + h = h + 13 * Bytes.hashCode(this.qualifier); + return h; + } + } +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteProtocol.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteProtocol.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteProtocol.java (revision 0) @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.example; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; + +/** + * Defines a protocol to delete data in bulk based on a scan. The scan can be range scan or with + * conditions(filters) etc. + *
Example:
+ * Scan scan = new Scan();
+ * // set scan properties(rowkey range, filters, timerange etc).
+ * HTable ht = ...;
+ * long noOfDeletedRows = 0L;
+ * Batch.Call<BulkDeleteProtocol, BulkDeleteResponse> callable = 
+ *     new Batch.Call<BulkDeleteProtocol, BulkDeleteResponse>() {
+ *   public BulkDeleteResponse call(BulkDeleteProtocol instance) throws IOException {
+ *     return instance.deleteRows(scan, BulkDeleteProtocol.DeleteType, timestamp, rowBatchSize);
+ *   }
+ * };
+ * Map<byte[], BulkDeleteResponse> result = ht.coprocessorExec(BulkDeleteProtocol.class,
+ *      scan.getStartRow(), scan.getStopRow(), callable);
+ *  for (BulkDeleteResponse response : result.values()) {
+ *    noOfDeletedRows = response.getRowsDeleted();
+ *  }
+ * 
+ */ +public interface BulkDeleteProtocol extends CoprocessorProtocol { + + public interface DeleteType { + /** + * Delete full row + */ + byte ROW = 0; + /** + * Delete full family(s). + * Which family(s) to be deleted will be determined by the Scan. + * Scan need to select all the families which need to be deleted. + */ + byte FAMILY = 1; + /** + * Delete full column(s). + * Which column(s) to be deleted will be determined by the Scan. + * Scan need to select all the qualifiers which need to be deleted. + */ + byte COLUMN = 2; + /** + * Delete one or more version(s) of column(s). + * Which column(s) and version(s) to be deleted will be determined by the Scan. + * Scan need to select all the qualifiers and its versions which need to be deleted. + * When a timestamp is passed only one version at that timestamp will be deleted(even if scan + * fetches many versions) + */ + byte VERSION = 3; + } + + /** + * + * @param scan + * @param deleteType + * @param timestamp + * @param rowBatchSize + * The number of rows which need to be accumulated by scan and delete as one batch + * @return + */ + BulkDeleteResponse delete(Scan scan, byte deleteType, Long timestamp, int rowBatchSize); +} \ No newline at end of file Index: src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteResponse.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteResponse.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteResponse.java (revision 0) @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.example; + +import java.io.IOException; +import java.io.Serializable; + +/** + * Wrapper class which returns the result of the bulk deletion operation happened at the server for + * a region. This includes the total number of rows deleted and/or any {@link IOException} which is + * happened while doing the operation. It will also include total number of versions deleted, when + * the delete type is VERSION. + */ +public class BulkDeleteResponse implements Serializable { + private static final long serialVersionUID = -8192337710525997237L; + private long rowsDeleted; + private IOException ioException; + private long versionsDeleted; + + public BulkDeleteResponse() { + + } + + public void setRowsDeleted(long rowsDeleted) { + this.rowsDeleted = rowsDeleted; + } + + public long getRowsDeleted() { + return rowsDeleted; + } + + public void setIoException(IOException ioException) { + this.ioException = ioException; + } + + public IOException getIoException() { + return ioException; + } + + public long getVersionsDeleted() { + return versionsDeleted; + } + + public void setVersionsDeleted(long versionsDeleted) { + this.versionsDeleted = versionsDeleted; + } +} \ No newline at end of file Index: src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestBulkDeleteProtocol.java (revision 0) @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.example; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.example.BulkDeleteProtocol.DeleteType; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestBulkDeleteProtocol { + private static final byte[] FAMILY1 = Bytes.toBytes("cf1"); + private static final byte[] FAMILY2 = Bytes.toBytes("cf2"); + private static final byte[] QUALIFIER1 = Bytes.toBytes("c1"); + private static final byte[] QUALIFIER2 = Bytes.toBytes("c2"); + private static final byte[] QUALIFIER3 = Bytes.toBytes("c3"); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setupBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().set(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY, + BulkDeleteEndpoint.class.getName()); + TEST_UTIL.startMiniCluster(2); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testBulkDeleteEndpoint() throws Throwable { + byte[] tableName = Bytes.toBytes("testBulkDeleteEndpoint"); + HTable ht = createTable(tableName); + List puts = new ArrayList(100); + for (int j = 0; j < 100; j++) { + byte[] rowkey = Bytes.toBytes(j); + puts.add(createPut(rowkey, "v1")); + } + ht.put(puts); + // Deleting all the rows. + long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 500, DeleteType.ROW, + null); + assertEquals(100, noOfRowsDeleted); + + int rows = 0; + for (Result result : ht.getScanner(new Scan())) { + rows++; + } + assertEquals(0, rows); + } + + @Test + public void testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion() + throws Throwable { + byte[] tableName = Bytes + .toBytes("testBulkDeleteEndpointWhenRowBatchSizeLessThanRowsToDeleteFromARegion"); + HTable ht = createTable(tableName); + List puts = new ArrayList(100); + for (int j = 0; j < 100; j++) { + byte[] rowkey = Bytes.toBytes(j); + puts.add(createPut(rowkey, "v1")); + } + ht.put(puts); + // Deleting all the rows. + long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, new Scan(), 10, DeleteType.ROW, null); + assertEquals(100, noOfRowsDeleted); + + int rows = 0; + for (Result result : ht.getScanner(new Scan())) { + rows++; + } + assertEquals(0, rows); + } + + private long invokeBulkDeleteProtocol(byte[] tableName, final Scan scan, final int rowBatchSize, + final byte deleteType, final Long timeStamp) throws Throwable { + HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName); + long noOfDeletedRows = 0L; + Batch.Call callable = + new Batch.Call() { + public BulkDeleteResponse call(BulkDeleteProtocol instance) throws IOException { + return instance.delete(scan, deleteType, timeStamp, rowBatchSize); + } + }; + Map result = ht.coprocessorExec(BulkDeleteProtocol.class, + scan.getStartRow(), scan.getStopRow(), callable); + for (BulkDeleteResponse response : result.values()) { + noOfDeletedRows += response.getRowsDeleted(); + } + return noOfDeletedRows; + } + + @Test + public void testBulkDeleteWithConditionBasedDelete() throws Throwable { + byte[] tableName = Bytes.toBytes("testBulkDeleteWithConditionBasedDelete"); + HTable ht = createTable(tableName); + List puts = new ArrayList(100); + for (int j = 0; j < 100; j++) { + byte[] rowkey = Bytes.toBytes(j); + String value = (j % 10 == 0) ? "v1" : "v2"; + puts.add(createPut(rowkey, value)); + } + ht.put(puts); + Scan scan = new Scan(); + FilterList fl = new FilterList(Operator.MUST_PASS_ALL); + SingleColumnValueFilter scvf = new SingleColumnValueFilter(FAMILY1, QUALIFIER3, + CompareOp.EQUAL, Bytes.toBytes("v1")); + //fl.addFilter(new FirstKeyOnlyFilter()); + fl.addFilter(scvf); + scan.setFilter(fl); + // Deleting all the rows where cf1:c1=v1 + long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.ROW, null); + assertEquals(10, noOfRowsDeleted); + + int rows = 0; + for (Result result : ht.getScanner(new Scan())) { + rows++; + } + assertEquals(90, rows); + } + + @Test + public void testBulkDeleteColumn() throws Throwable { + byte[] tableName = Bytes.toBytes("testBulkDeleteColumn"); + HTable ht = createTable(tableName); + List puts = new ArrayList(100); + for (int j = 0; j < 100; j++) { + byte[] rowkey = Bytes.toBytes(j); + String value = (j % 10 == 0) ? "v1" : "v2"; + puts.add(createPut(rowkey, value)); + } + ht.put(puts); + Scan scan = new Scan (); + scan.addColumn(FAMILY1, QUALIFIER2); + // Delete the column cf1:col2 + long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.COLUMN, null); + assertEquals(100, noOfRowsDeleted); + + int rows = 0; + for (Result result : ht.getScanner(new Scan())) { + assertEquals(2, result.getFamilyMap(FAMILY1).size()); + assertTrue(result.getColumn(FAMILY1, QUALIFIER2).isEmpty()); + assertEquals(1, result.getColumn(FAMILY1, QUALIFIER1).size()); + assertEquals(1, result.getColumn(FAMILY1, QUALIFIER3).size()); + rows++; + } + assertEquals(100, rows); + } + + @Test + public void testBulkDeleteFamily() throws Throwable { + byte[] tableName = Bytes.toBytes("testBulkDeleteFamily"); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(FAMILY1)); + htd.addFamily(new HColumnDescriptor(FAMILY2)); + TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5); + HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName); + List puts = new ArrayList(100); + for (int j = 0; j < 100; j++) { + Put put = new Put(Bytes.toBytes(j)); + put.add(FAMILY1, QUALIFIER1, "v1".getBytes()); + put.add(FAMILY2, QUALIFIER2, "v2".getBytes()); + puts.add(put); + } + ht.put(puts); + Scan scan = new Scan (); + scan.addFamily(FAMILY1); + // Delete the column family cf1 + long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.FAMILY, null); + assertEquals(100, noOfRowsDeleted); + int rows = 0; + for (Result result : ht.getScanner(new Scan())) { + assertTrue(result.getFamilyMap(FAMILY1).isEmpty()); + assertEquals(1, result.getColumn(FAMILY2, QUALIFIER2).size()); + rows++; + } + assertEquals(100, rows); + } + + @Test + public void testBulkDeleteColumnVersion() throws Throwable { + byte[] tableName = Bytes.toBytes("testBulkDeleteColumnVersion"); + HTable ht = createTable(tableName); + List puts = new ArrayList(100); + for (int j = 0; j < 100; j++) { + Put put = new Put(Bytes.toBytes(j)); + byte[] value = "v1".getBytes(); + put.add(FAMILY1, QUALIFIER1, 1234L, value); + put.add(FAMILY1, QUALIFIER2, 1234L, value); + put.add(FAMILY1, QUALIFIER3, 1234L, value); + // Latest version values + value = "v2".getBytes(); + put.add(FAMILY1, QUALIFIER1, value); + put.add(FAMILY1, QUALIFIER2, value); + put.add(FAMILY1, QUALIFIER3, value); + put.add(FAMILY1, null, value); + puts.add(put); + } + ht.put(puts); + Scan scan = new Scan (); + scan.addFamily(FAMILY1); + // Delete the latest version values of all the columns in family cf1. + long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION, + HConstants.LATEST_TIMESTAMP); + assertEquals(100, noOfRowsDeleted); + int rows = 0; + scan = new Scan (); + scan.setMaxVersions(); + for (Result result : ht.getScanner(scan)) { + assertEquals(3, result.getFamilyMap(FAMILY1).size()); + List column = result.getColumn(FAMILY1, QUALIFIER1); + assertEquals(1, column.size()); + assertTrue(Bytes.equals("v1".getBytes(), column.get(0).getValue())); + + column = result.getColumn(FAMILY1, QUALIFIER2); + assertEquals(1, column.size()); + assertTrue(Bytes.equals("v1".getBytes(), column.get(0).getValue())); + + column = result.getColumn(FAMILY1, QUALIFIER3); + assertEquals(1, column.size()); + assertTrue(Bytes.equals("v1".getBytes(), column.get(0).getValue())); + rows++; + } + assertEquals(100, rows); + } + + @Test + public void testBulkDeleteColumnVersionBasedOnTS() throws Throwable { + byte[] tableName = Bytes.toBytes("testBulkDeleteColumnVersionBasedOnTS"); + HTable ht = createTable(tableName); + List puts = new ArrayList(100); + for (int j = 0; j < 100; j++) { + Put put = new Put(Bytes.toBytes(j)); + // TS = 1000L + byte[] value = "v1".getBytes(); + put.add(FAMILY1, QUALIFIER1, 1000L, value); + put.add(FAMILY1, QUALIFIER2, 1000L, value); + put.add(FAMILY1, QUALIFIER3, 1000L, value); + // TS = 1234L + value = "v2".getBytes(); + put.add(FAMILY1, QUALIFIER1, 1234L, value); + put.add(FAMILY1, QUALIFIER2, 1234L, value); + put.add(FAMILY1, QUALIFIER3, 1234L, value); + // Latest version values + value = "v3".getBytes(); + put.add(FAMILY1, QUALIFIER1, value); + put.add(FAMILY1, QUALIFIER2, value); + put.add(FAMILY1, QUALIFIER3, value); + puts.add(put); + } + ht.put(puts); + Scan scan = new Scan (); + scan.addColumn(FAMILY1, QUALIFIER3); + // Delete the column cf1:c3's one version at TS=1234 + long noOfRowsDeleted = invokeBulkDeleteProtocol(tableName, scan, 500, DeleteType.VERSION, 1234L); + assertEquals(100, noOfRowsDeleted); + int rows = 0; + scan = new Scan (); + scan.setMaxVersions(); + for (Result result : ht.getScanner(scan)) { + assertEquals(3, result.getFamilyMap(FAMILY1).size()); + assertEquals(3, result.getColumn(FAMILY1, QUALIFIER1).size()); + assertEquals(3, result.getColumn(FAMILY1, QUALIFIER2).size()); + List column = result.getColumn(FAMILY1, QUALIFIER3); + assertEquals(2, column.size()); + assertTrue(Bytes.equals("v3".getBytes(), column.get(0).getValue())); + assertTrue(Bytes.equals("v1".getBytes(), column.get(1).getValue())); + rows++; + } + assertEquals(100, rows); + } + + @Test + public void testBulkDeleteWithNumberOfVersions() throws Throwable { + byte[] tableName = Bytes.toBytes("testBulkDeleteWithNumberOfVersions"); + HTable ht = createTable(tableName); + List puts = new ArrayList(100); + for (int j = 0; j < 100; j++) { + Put put = new Put(Bytes.toBytes(j)); + // TS = 1000L + byte[] value = "v1".getBytes(); + put.add(FAMILY1, QUALIFIER1, 1000L, value); + put.add(FAMILY1, QUALIFIER2, 1000L, value); + put.add(FAMILY1, QUALIFIER3, 1000L, value); + // TS = 1234L + value = "v2".getBytes(); + put.add(FAMILY1, QUALIFIER1, 1234L, value); + put.add(FAMILY1, QUALIFIER2, 1234L, value); + put.add(FAMILY1, QUALIFIER3, 1234L, value); + // TS = 2000L + value = "v3".getBytes(); + put.add(FAMILY1, QUALIFIER1, 2000L, value); + put.add(FAMILY1, QUALIFIER2, 2000L, value); + put.add(FAMILY1, QUALIFIER3, 2000L, value); + // Latest version values + value = "v4".getBytes(); + put.add(FAMILY1, QUALIFIER1, value); + put.add(FAMILY1, QUALIFIER2, value); + put.add(FAMILY1, QUALIFIER3, value); + puts.add(put); + } + ht.put(puts); + + // Delete all the versions of columns cf1:c1 and cf1:c2 falling with the time range + // [1000,2000) + final Scan scan = new Scan(); + scan.addColumn(FAMILY1, QUALIFIER1); + scan.addColumn(FAMILY1, QUALIFIER2); + scan.setTimeRange(1000L, 2000L); + scan.setMaxVersions(); + + long noOfDeletedRows = 0L; + long noOfVersionsDeleted = 0L; + Batch.Call callable = + new Batch.Call() { + public BulkDeleteResponse call(BulkDeleteProtocol instance) throws IOException { + return instance.delete(scan, DeleteType.VERSION, null, 500); + } + }; + Map result = ht.coprocessorExec(BulkDeleteProtocol.class, + scan.getStartRow(), scan.getStopRow(), callable); + for (BulkDeleteResponse response : result.values()) { + noOfDeletedRows += response.getRowsDeleted(); + noOfVersionsDeleted += response.getVersionsDeleted(); + } + assertEquals(100, noOfDeletedRows); + assertEquals(400, noOfVersionsDeleted); + + int rows = 0; + Scan scan1 = new Scan (); + scan1.setMaxVersions(); + for (Result res : ht.getScanner(scan1)) { + assertEquals(3, res.getFamilyMap(FAMILY1).size()); + List column = res.getColumn(FAMILY1, QUALIFIER1); + assertEquals(2, column.size()); + assertTrue(Bytes.equals("v4".getBytes(), column.get(0).getValue())); + assertTrue(Bytes.equals("v3".getBytes(), column.get(1).getValue())); + column = res.getColumn(FAMILY1, QUALIFIER2); + assertEquals(2, column.size()); + assertTrue(Bytes.equals("v4".getBytes(), column.get(0).getValue())); + assertTrue(Bytes.equals("v3".getBytes(), column.get(1).getValue())); + assertEquals(4, res.getColumn(FAMILY1, QUALIFIER3).size()); + rows++; + } + assertEquals(100, rows); + } + + private HTable createTable(byte[] tableName) throws IOException { + HTableDescriptor htd = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(FAMILY1); + hcd.setMaxVersions(10);// Just setting 10 as I am not testing with more than 10 versions here + htd.addFamily(hcd); + TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5); + HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName); + return ht; + } + + private Put createPut(byte[] rowkey, String value) throws IOException { + Put put = new Put(rowkey); + put.add(FAMILY1, QUALIFIER1, value.getBytes()); + put.add(FAMILY1, QUALIFIER2, value.getBytes()); + put.add(FAMILY1, QUALIFIER3, value.getBytes()); + return put; + } +} \ No newline at end of file