Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/BulkDeleteClient.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/coprocessor/BulkDeleteClient.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/BulkDeleteClient.java (revision 0)
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client.coprocessor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BulkDeleteProtocol;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This can be used to delete rows in bulk based on a scan. Scan can be condition based or range
+ * based etc. This scan and the delete operation will be done at the RS side itself with out
+ * fetching any data into client side and so will be efficient.
+ */
+public class BulkDeleteClient {
+
+ private Configuration conf;
+
+ public BulkDeleteClient(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * @param tableName
+ * @param scan
+ * @param rowBatchSize
+ * - The number of rowkeys which needs to accumulated by scan and delete as one batch.
+ * @return The total number of rows deleted by the operation
+ * @throws Throwable
+ *
+ * Example : Delete all the rows where value for a column cf1:c1 equals 10
+ *
+ * byte[] tableName = Bytes.toBytes("t1");
+ * Scan scan = new Scan();
+ * SingleColumnValueFilter scvf = new SingleColumnValueFilter(Bytes
+ * .toBytes("cf1"), Bytes.toBytes("c1"), CompareOp.EQUAL, Bytes
+ * .toBytes(10));
+ * scan.setFilter(scvf);
+ * BulkDeleteClient bulkDeleteClient = new BulkDeleteClient(conf);
+ * long noOfRowsDeleted = bulkDeleteClient.deleteRows(tableName, scan, 500);
+ *
+ */
+ public long deleteRows(byte[] tableName, final Scan scan, final int rowBatchSize)
+ throws Throwable {
+ validateParameters(scan);
+ HTable ht = new HTable(this.conf, tableName);
+ long noOfDeletedRows = 0L;
+ Batch.Call callable = new Batch.Call() {
+ public Long call(BulkDeleteProtocol instance) throws IOException {
+ return instance.deleteRows(scan, rowBatchSize);
+ }
+ };
+ Map result = ht.coprocessorExec(BulkDeleteProtocol.class, scan.getStartRow(),
+ scan.getStopRow(), callable);
+ for (Long rows : result.values()) {
+ noOfDeletedRows += rows;
+ }
+ return noOfDeletedRows;
+ }
+
+ private void validateParameters(Scan scan) throws IOException {
+ if (scan == null
+ || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes.equals(
+ scan.getStartRow(), HConstants.EMPTY_START_ROW))
+ || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) && !Bytes.equals(
+ scan.getStopRow(), HConstants.EMPTY_END_ROW))) {
+ throw new IOException("Startrow should be smaller than Stoprow");
+ }
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/coprocessor/BulkDeleteEndpoint.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/coprocessor/BulkDeleteEndpoint.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/coprocessor/BulkDeleteEndpoint.java (revision 0)
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Pair;
+
+public class BulkDeleteEndpoint extends BaseEndpointCoprocessor implements BulkDeleteProtocol {
+ @Override
+ public Long deleteRows(Scan scan, int rowBatchSize) throws IOException {
+ long totalRowsDeleted = 0L;
+ HRegion region = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion();
+ // Here by assume that the scan is perfect with the appropriate filter and having
+ // necessary column(s). No need to fetch all the KVs.. what we need is just the rowkey
+ RegionScanner scanner = region.getScanner(scan);
+ List results = new ArrayList();
+ boolean hasMore = false;
+ try {
+ while (true) {
+ List deleteRowKeys = new ArrayList(rowBatchSize);
+ for (int i = 0; i < rowBatchSize; i++) {
+ hasMore = scanner.next(results);
+ if (results.size() > 0) {
+ // We just need the rowkey. Get it from 1st KV.
+ byte[] row = results.get(0).getRow();
+ deleteRowKeys.add(row);
+ results.clear();
+ }
+ if (!hasMore) {
+ // There are no more rows.
+ break;
+ }
+ }
+ if (deleteRowKeys.size() > 0) {
+ Pair[] deleteWithLockArr = new Pair[deleteRowKeys.size()];
+ int i = 0;
+ for (byte[] deleteRowKey : deleteRowKeys) {
+ deleteWithLockArr[i++] = new Pair(new Delete(deleteRowKey), null);
+ }
+ region.batchMutate(deleteWithLockArr);
+ totalRowsDeleted += deleteRowKeys.size();
+ } else {
+ break;
+ }
+ }
+ } finally {
+ scanner.close();
+ }
+ return totalRowsDeleted;
+ }
+}
Index: src/main/java/org/apache/hadoop/hbase/coprocessor/BulkDeleteProtocol.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/coprocessor/BulkDeleteProtocol.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/coprocessor/BulkDeleteProtocol.java (revision 0)
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+
+/**
+ * Defines a protocol to delete rows in bulk based on a scan. The scan can be range scan or with
+ * conditions(filters) etc.
+ *
+ * @see org.apache.hadoop.hbase.client.coprocessor.BulkDeleteClient
+ */
+public interface BulkDeleteProtocol extends CoprocessorProtocol {
+ /**
+ * @param scan
+ * @param rowBatchSize
+ * - The number of rows which need to be accumulated by scan and delete as one batch.
+ * @return
+ * @throws IOException
+ */
+ Long deleteRows(Scan scan, int rowBatchSize) throws IOException;
+}
Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestBulkDeleteProtocol.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/coprocessor/TestBulkDeleteProtocol.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestBulkDeleteProtocol.java (revision 0)
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.BulkDeleteClient;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestBulkDeleteProtocol {
+ private static final byte[] FAMILY = Bytes.toBytes("cf1");
+ private static final byte[] QUALIFIER = Bytes.toBytes("c1");
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ TEST_UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ "org.apache.hadoop.hbase.coprocessor.BulkDeleteEndpoint");
+ TEST_UTIL.startMiniCluster(2);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testBulkDeleteEndpoint() throws Throwable {
+ byte[] tableName = Bytes.toBytes("testBulkDeleteEndpoint");
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
+ htd.addFamily(hcd);
+ TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
+
+ HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ List puts = new ArrayList(100);
+ for (int j = 0; j < 100; j++) {
+ byte[] rowkey = Bytes.toBytes(j);
+ puts.add(createPut(rowkey, "v1"));
+ }
+ ht.put(puts);
+ Scan scan = new Scan();
+ // Deleting all the rows.
+ BulkDeleteClient bulkDeleteClient = new BulkDeleteClient(TEST_UTIL.getConfiguration());
+ long noOfRowsDeleted = bulkDeleteClient.deleteRows(tableName, scan, 500);
+ assertEquals(100, noOfRowsDeleted);
+
+ scan = new Scan();
+ int rows = 0;
+ for(Result result : ht.getScanner(scan)){
+ rows++;
+ }
+ assertEquals(0, rows);
+ }
+
+ @Test
+ public void testBulkDeleteWithConditionBasedDelete() throws Throwable {
+ byte[] tableName = Bytes.toBytes("testBulkDeleteWithConditionBasedDelete");
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+ HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
+ htd.addFamily(hcd);
+ TEST_UTIL.getHBaseAdmin().createTable(htd, Bytes.toBytes(0), Bytes.toBytes(120), 5);
+ HTable ht = new HTable(TEST_UTIL.getConfiguration(), tableName);
+ List puts = new ArrayList(100);
+ for (int j = 0; j < 100; j++) {
+ byte[] rowkey = Bytes.toBytes(j);
+ String value = (j % 10 == 0) ? "v1" : "v2";
+ puts.add(createPut(rowkey, value));
+ }
+ ht.put(puts);
+ Scan scan = new Scan();
+ SingleColumnValueFilter scvf = new SingleColumnValueFilter(FAMILY, QUALIFIER, CompareOp.EQUAL, Bytes
+ .toBytes("v1"));
+ scan.setFilter(scvf);
+ // Deleting all the rows.
+ BulkDeleteClient bulkDeleteClient = new BulkDeleteClient(TEST_UTIL.getConfiguration());
+ long noOfRowsDeleted = bulkDeleteClient.deleteRows(tableName, scan, 500);
+ assertEquals(10, noOfRowsDeleted);
+
+ int rows = 0;
+ for(Result result : ht.getScanner(new Scan())){
+ rows++;
+ }
+ assertEquals(90, rows);
+ }
+
+ private Put createPut(byte[] rowkey, String value) throws IOException {
+ Put put = new Put(rowkey);
+ put.add(FAMILY, QUALIFIER, value.getBytes());
+ return put;
+ }
+}