Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/BulkDeleteClient.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/coprocessor/BulkDeleteClient.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/BulkDeleteClient.java (revision 0) @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client.coprocessor; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BulkDeleteProtocol; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This can be used to delete rows in bulk based on a scan. Scan can be condition based or range + * based etc. This scan and the delete operation will be done at the RS side itself with out + * fetching any data into client side and so will be efficient. + */ +public class BulkDeleteClient { + + private Configuration conf; + + public BulkDeleteClient(Configuration conf) { + this.conf = conf; + } + + /** + * @param tableName + * @param scan + * @param rowBatchSize + * - The number of rowkeys which needs to accumulated by scan and delete as one batch. + * @return The total number of rows deleted by the operation + * @throws Throwable + *

+ * Example : Delete all the rows where value for a column cf1:c1 equals 10
+ * + * byte[] tableName = Bytes.toBytes("t1");
+ * Scan scan = new Scan();
+ * SingleColumnValueFilter scvf = new SingleColumnValueFilter(Bytes
+ * .toBytes("cf1"), Bytes.toBytes("c1"), CompareOp.EQUAL, Bytes
+ * .toBytes(10));
+ * scan.setFilter(scvf);
+ * BulkDeleteClient bulkDeleteClient = new BulkDeleteClient(conf);
+ * long noOfRowsDeleted = bulkDeleteClient.deleteRows(tableName, scan, 500);
+ *
+ */ + public long deleteRows(byte[] tableName, final Scan scan, final int rowBatchSize) + throws Throwable { + validateParameters(scan); + HTable ht = new HTable(this.conf, tableName); + long noOfDeletedRows = 0L; + Batch.Call callable = new Batch.Call() { + public Long call(BulkDeleteProtocol instance) throws IOException { + return instance.deleteRows(scan, rowBatchSize); + } + }; + Map result = ht.coprocessorExec(BulkDeleteProtocol.class, scan.getStartRow(), + scan.getStopRow(), callable); + for (Long rows : result.values()) { + noOfDeletedRows += rows; + } + return noOfDeletedRows; + } + + private void validateParameters(Scan scan) throws IOException { + if (scan == null + || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes.equals( + scan.getStartRow(), HConstants.EMPTY_START_ROW)) + || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) && !Bytes.equals( + scan.getStopRow(), HConstants.EMPTY_END_ROW))) { + throw new IOException("Startrow should be smaller than Stoprow"); + } + } +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/BulkDeleteEndpoint.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/BulkDeleteEndpoint.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/BulkDeleteEndpoint.java (revision 0) @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Pair; + +public class BulkDeleteEndpoint extends BaseEndpointCoprocessor implements BulkDeleteProtocol { + @Override + public Long deleteRows(Scan scan, int rowBatchSize) throws IOException { + long totalRowsDeleted = 0L; + HRegion region = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion(); + // Here by assume that the scan is perfect with the appropriate filter and having + // necessary column(s). No need to fetch all the KVs.. what we need is just the rowkey + RegionScanner scanner = region.getScanner(scan); + List results = new ArrayList(); + boolean hasMore = false; + try { + while (true) { + List deleteRowKeys = new ArrayList(rowBatchSize); + for (int i = 0; i < rowBatchSize; i++) { + hasMore = scanner.next(results); + if (results.size() > 0) { + // We just need the rowkey. Get it from 1st KV. + byte[] row = results.get(0).getRow(); + deleteRowKeys.add(row); + results.clear(); + } + if (!hasMore) { + // There are no more rows. + break; + } + } + if (deleteRowKeys.size() > 0) { + Pair[] deleteWithLockArr = new Pair[deleteRowKeys.size()]; + int i = 0; + for (byte[] deleteRowKey : deleteRowKeys) { + deleteWithLockArr[i++] = new Pair(new Delete(deleteRowKey), null); + } + region.batchMutate(deleteWithLockArr); + totalRowsDeleted += deleteRowKeys.size(); + } else { + break; + } + } + } finally { + scanner.close(); + } + return totalRowsDeleted; + } +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/BulkDeleteProtocol.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/BulkDeleteProtocol.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/BulkDeleteProtocol.java (revision 0) @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; + +/** + * Defines a protocol to delete rows in bulk based on a scan. The scan can be range scan or with + * conditions(filters) etc. + * + * @see org.apache.hadoop.hbase.client.coprocessor.BulkDeleteClient + */ +public interface BulkDeleteProtocol extends CoprocessorProtocol { + /** + * @param scan + * @param rowBatchSize + * - The number of rows which need to be accumulated by scan and delete as one batch. + * @return + * @throws IOException + */ + Long deleteRows(Scan scan, int rowBatchSize) throws IOException; +} Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestBulkDeleteProtocol.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestBulkDeleteProtocol.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestBulkDeleteProtocol.java (revision 0) @@ -0,0 +1,128 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.BulkDeleteClient; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestBulkDeleteProtocol { + private static final byte[] FAMILY = Bytes.toBytes("cf1"); + private static final byte[] QUALIFIER = Bytes.toBytes("c1"); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setupBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.coprocessor.BulkDeleteEndpoint"); + TEST_UTIL.startMiniCluster(2); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testBulkDeleteEndpoint() throws Throwable { + byte[] tableName = Bytes.toBytes("testBulkDeleteEndpoint"); + HTableDescriptor htd = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); + htd.addFamily(hcd); + TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5); + + HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName); + List puts = new ArrayList(100); + for (int j = 0; j < 100; j++) { + byte[] rowkey = Bytes.toBytes(j); + puts.add(createPut(rowkey, "v1")); + } + ht.put(puts); + Scan scan = new Scan(); + // Deleting all the rows. + BulkDeleteClient bulkDeleteClient = new BulkDeleteClient(TEST_UTIL.getConfiguration()); + long noOfRowsDeleted = bulkDeleteClient.deleteRows(tableName, scan, 500); + assertEquals(100, noOfRowsDeleted); + + scan = new Scan(); + int rows = 0; + for(Result result : ht.getScanner(scan)){ + rows++; + } + assertEquals(0, rows); + } + + @Test + public void testBulkDeleteWithConditionBasedDelete() throws Throwable { + byte[] tableName = Bytes.toBytes("testBulkDeleteWithConditionBasedDelete"); + HTableDescriptor htd = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor(FAMILY); + htd.addFamily(hcd); + TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5); + HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName); + List puts = new ArrayList(100); + for (int j = 0; j < 100; j++) { + byte[] rowkey = Bytes.toBytes(j); + String value = (j % 10 == 0) ? "v1" : "v2"; + puts.add(createPut(rowkey, value)); + } + ht.put(puts); + Scan scan = new Scan(); + SingleColumnValueFilter scvf = new SingleColumnValueFilter(FAMILY, QUALIFIER, CompareOp.EQUAL, Bytes + .toBytes("v1")); + scan.setFilter(scvf); + // Deleting all the rows. + BulkDeleteClient bulkDeleteClient = new BulkDeleteClient(TEST_UTIL.getConfiguration()); + long noOfRowsDeleted = bulkDeleteClient.deleteRows(tableName, scan, 500); + assertEquals(10, noOfRowsDeleted); + + int rows = 0; + for(Result result : ht.getScanner(new Scan())){ + rows++; + } + assertEquals(90, rows); + } + + private Put createPut(byte[] rowkey, String value) throws IOException { + Put put = new Put(rowkey); + put.add(FAMILY, QUALIFIER, value.getBytes()); + return put; + } +}