diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java index d259f9f..c147e20 100644 --- src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java +++ src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java @@ -79,27 +79,34 @@ public class BulkDeleteEndpoint extends BaseEndpointCoprocessor implements BulkD } } if (deleteRows.size() > 0) { - Pair[] deleteWithLockArr = new Pair[deleteRows.size()]; + List > deleteWithLockArrList = + new ArrayList >(deleteRows.size()); int i = 0; for (List deleteRow : deleteRows) { Delete delete = createDeleteMutation(deleteRow, deleteType, timestamp); - deleteWithLockArr[i++] = new Pair(delete, null); - } - OperationStatus[] opStatus = region.batchMutate(deleteWithLockArr); - for (i = 0; i < opStatus.length; i++) { - if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { - break; + if (delete != null) { + deleteWithLockArrList.add(new Pair(delete, null)); } - totalRowsDeleted++; - if (deleteType == DeleteType.VERSION) { - byte[] versionsDeleted = deleteWithLockArr[i].getFirst().getAttribute( - NO_OF_VERSIONS_TO_DELETE); - if (versionsDeleted != null) { - totalVersionsDeleted += Bytes.toInt(versionsDeleted); + } + Pair[] deleteWithLockArr = (Pair[]) + deleteWithLockArrList.toArray(); + if (deleteWithLockArr.length > 0) { + OperationStatus[] opStatus = region.batchMutate(deleteWithLockArr); + for (i = 0; i < opStatus.length; i++) { + if (opStatus[i].getOperationStatusCode() != OperationStatusCode.SUCCESS) { + break; + } + totalRowsDeleted++; + if (deleteType == DeleteType.VERSION) { + byte[] versionsDeleted = deleteWithLockArr[i].getFirst().getAttribute( + NO_OF_VERSIONS_TO_DELETE); + if (versionsDeleted != null) { + totalVersionsDeleted += Bytes.toInt(versionsDeleted); + } } } } - } + } } } catch (IOException ioe) { LOG.error(ioe); @@ -128,6 +135,7 @@ public class BulkDeleteEndpoint extends BaseEndpointCoprocessor implements BulkD // We just need the rowkey. Get it from 1st KV. byte[] row = deleteRow.get(0).getRow(); Delete delete = new Delete(row, ts, null); + int numDeletes = 0; if (deleteType != DeleteType.ROW) { switch (deleteType) { case DeleteType.FAMILY: @@ -135,6 +143,7 @@ public class BulkDeleteEndpoint extends BaseEndpointCoprocessor implements BulkD for (KeyValue kv : deleteRow) { if (families.add(kv.getFamily())) { delete.deleteFamily(kv.getFamily(), ts); + numDeletes++; } } break; @@ -148,6 +157,7 @@ public class BulkDeleteEndpoint extends BaseEndpointCoprocessor implements BulkD // Every call to deleteColumns() will add a new KV to the familymap which will finally // get written to the memstore as part of delete(). delete.deleteColumns(column.family, column.qualifier, ts); + numDeletes++; } } break; @@ -173,11 +183,16 @@ public class BulkDeleteEndpoint extends BaseEndpointCoprocessor implements BulkD noOfVersionsToDelete++; } } + numDeletes = noOfVersionsToDelete; } delete.setAttribute(NO_OF_VERSIONS_TO_DELETE, Bytes.toBytes(noOfVersionsToDelete)); } } - return delete; + if (numDeletes > 0 || deleteType == DeleteType.ROW) { + return delete; + } else { + return null; + } } private static class Column { @@ -207,4 +222,4 @@ public class BulkDeleteEndpoint extends BaseEndpointCoprocessor implements BulkD return h; } } -} \ No newline at end of file +}