From 9ce5fdef8f41ddb918d40fb42e22564bcef86f27 Mon Sep 17 00:00:00 2001 From: aaraujo Date: Wed, 11 Nov 2015 16:37:53 -0600 Subject: [PATCH] HBASE-14791 Optionally batch Deletes in MapReduce jobs MapReduce jobs like CopyTable can take considerably longer to copy data if there are Deletes. Unlike Puts, they are not batched and sent one at a time to the destination. Added BufferedHTable for MapReduce OutputFormats that write to HBase. Buffering is disabled by default. If buffering is enabled, Deletes are buffered and flushed in a single RPC when the configured write buffer size is reached. --- .../hadoop/hbase/mapreduce/BufferedHTable.java | 352 +++++++++++++++++++++ .../hbase/mapreduce/MultiTableOutputFormat.java | 14 +- .../hadoop/hbase/mapreduce/TableOutputFormat.java | 10 +- .../hadoop/hbase/mapreduce/TestBufferedHTable.java | 126 ++++++++ 4 files changed, 490 insertions(+), 12 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/BufferedHTable.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestBufferedHTable.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/BufferedHTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/BufferedHTable.java new file mode 100644 index 0000000..43d75a3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/BufferedHTable.java @@ -0,0 +1,352 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.Service; + +/** + * Buffers writes for additional HTableInterface operations that are not buffered in {@code HTable}. + * This includes {@code Delete} operations, which can significantly speed up MapReduce jobs. + * Buffering is off by default. Unless it is enabled, all calls are delegated to {@code HTable}. + */ +class BufferedHTable implements HTableInterface { + + /** Job parameter that specifies whether to buffer deletes. */ + public static final String BUFFER_DELETES = "hbase.mapred.bufferdeletes"; + + /** Whether to buffer deletes (defaults to false, or off) */ + private boolean bufferDeletes = false; + + private HTableInterface delegate; + private boolean closed = false; + protected List bufferedDeletes = new LinkedList(); + private long currentDeleteBufferSize; + + public BufferedHTable(Configuration conf, String tableName) throws IOException { + this.delegate = new HTable(conf, tableName); + this.bufferDeletes = conf.getBoolean(BUFFER_DELETES, false); + } + + public BufferedHTable(Configuration conf, byte[] tableName) throws IOException { + this.delegate = new HTable(conf, tableName); + this.bufferDeletes = conf.getBoolean(BUFFER_DELETES, false); + } + + @VisibleForTesting + BufferedHTable(Configuration conf, HTableInterface delegate) { + this.delegate = delegate; + this.bufferDeletes = conf.getBoolean(BUFFER_DELETES, false); + } + + @Override + public void delete(Delete delete) throws IOException { + if (this.bufferDeletes) { + doDelete(delete); + return; + } + delegate.delete(delete); + } + + @Override + public void delete(List deletes) throws IOException { + if (this.bufferDeletes) { + for (Delete delete : deletes) { + doDelete(delete); + } + return; + } + delegate.delete(deletes); + } + + @Override + public void setWriteBufferSize(long writeBufferSize) throws IOException { + delegate.setWriteBufferSize(writeBufferSize); + if(this.currentDeleteBufferSize > writeBufferSize) { + flushDeletes(); + } + } + + private void doDelete(Delete delete) throws IOException { + if (this.closed) { + throw new IllegalStateException("BufferedHTable was closed"); + } + + this.currentDeleteBufferSize += delete.heapSize(); + this.bufferedDeletes.add(delete); + + // flush when write buffer size exceeds configured limit + if (this.currentDeleteBufferSize > getWriteBufferSize()) { + flushDeletes(); + } + } + + private void flushDeletes() throws IOException { + if (this.currentDeleteBufferSize > 0) { + // batch deletes into a single RPC + delegate.delete(bufferedDeletes); + } + this.bufferedDeletes.clear(); + this.currentDeleteBufferSize = 0; + } + + @Override + public void close() throws IOException { + if (this.closed) { + return; + } + flushCommits(); + this.closed = true; + delegate.close(); + } + + @Override + public void flushCommits() throws IOException { + flushDeletes(); + delegate.flushCommits(); + } + + @Override + public byte[] getTableName() { + return delegate.getTableName(); + } + + @Override + public TableName getName() { + return delegate.getName(); + } + + @Override + public Configuration getConfiguration() { + return delegate.getConfiguration(); + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + return delegate.getTableDescriptor(); + } + + @Override + public boolean exists(Get get) throws IOException { + return delegate.exists(get); + } + + @Override + public Boolean[] exists(List gets) throws IOException { + return delegate.exists(gets); + } + + @Override + public void batch(List actions, Object[] results) + throws IOException, InterruptedException { + delegate.batch(actions, results); + } + + @Override + public Object[] batch(List actions) throws IOException, InterruptedException { + return delegate.batch(actions); + } + + @Override + public void batchCallback(List actions, Object[] results, + Batch.Callback callback) throws IOException, InterruptedException { + delegate.batchCallback(actions, results, callback); + } + + @Override + public Object[] batchCallback(List actions, Batch.Callback callback) + throws IOException, InterruptedException { + return delegate.batchCallback(actions, callback); + } + + @Override + public Result get(Get get) throws IOException { + return delegate.get(get); + } + + @Override + public Result[] get(List gets) throws IOException { + return delegate.get(gets); + } + + @Override + public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { + return delegate.getRowOrBefore(row, family); + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + return delegate.getScanner(scan); + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + return delegate.getScanner(family); + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { + return delegate.getScanner(family, qualifier); + } + + @Override + public void put(Put put) throws IOException { + delegate.put(put); + } + + @Override + public void put(List puts) throws IOException { + delegate.put(puts); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) + throws IOException { + return delegate.checkAndPut(row, family, qualifier, value, put); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, + Delete delete) throws IOException { + return delegate.checkAndDelete(row, family, qualifier, value, delete); + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + delegate.mutateRow(rm); + } + + @Override + public Result append(Append append) throws IOException { + return delegate.append(append); + } + + @Override + public Result increment(Increment increment) throws IOException { + return delegate.increment(increment); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) + throws IOException { + return delegate.incrementColumnValue(row, family, qualifier, amount); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, + Durability durability) throws IOException { + return delegate.incrementColumnValue(row, family, qualifier, amount, durability); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, + boolean writeToWAL) throws IOException { + return delegate.incrementColumnValue(row, family, qualifier, amount, writeToWAL); + } + + @Override + public boolean isAutoFlush() { + return delegate.isAutoFlush(); + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + return delegate.coprocessorService(row); + } + + @Override + public Map coprocessorService(Class service, byte[] startKey, + byte[] endKey, Batch.Call callable) throws Throwable { + return delegate.coprocessorService(service, startKey, endKey, callable); + } + + @Override + public void coprocessorService(Class service, byte[] startKey, + byte[] endKey, Batch.Call callable, Batch.Callback callback) throws Throwable { + delegate.coprocessorService(service, startKey, endKey, callable, callback); + } + + @Override + public void setAutoFlush(boolean autoFlush) { + delegate.setAutoFlush(autoFlush); + } + + @Override + public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { + delegate.setAutoFlush(autoFlush, clearBufferOnFail); + } + + @Override + public void setAutoFlushTo(boolean autoFlush) { + delegate.setAutoFlushTo(autoFlush); + } + + @Override + public long getWriteBufferSize() { + return delegate.getWriteBufferSize(); + } + + @Override + public Map batchCoprocessorService( + Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, + byte[] endKey, R responsePrototype) throws Throwable { + return delegate.batchCoprocessorService(methodDescriptor, request, startKey, endKey, + responsePrototype); + } + + @Override + public void batchCoprocessorService( + Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, + byte[] endKey, R responsePrototype, Batch.Callback callback) throws Throwable { + delegate.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, + callback); + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException { + return delegate.checkAndMutate(row, family, qualifier, compareOp, value, mutation); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java index 5902a8e..2199513 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Durability; @@ -72,7 +72,7 @@ public class MultiTableOutputFormat extends OutputFormat { private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class); - Map tables; + Map tables; Configuration conf; boolean useWriteAheadLogging; @@ -87,7 +87,7 @@ public class MultiTableOutputFormat extends OutputFormat(); + this.tables = new HashMap(); this.conf = conf; this.useWriteAheadLogging = useWriteAheadLogging; } @@ -99,10 +99,10 @@ public class MultiTableOutputFormat extends OutputFormat { /** The table to write to. */ - private HTable table; + private HTableInterface table; /** * Instantiate a TableRecordWriter with the HBase HClient for writing. * * @param table The table to write to. */ - public TableRecordWriter(HTable table) { + public TableRecordWriter(HTableInterface table) { this.table = table; } @@ -203,7 +203,7 @@ implements Configurable { if (zkClientPort != 0) { this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); } - this.table = new HTable(this.conf, tableName); + this.table = new BufferedHTable(this.conf, tableName); this.table.setAutoFlush(false, true); LOG.info("Created table instance for " + tableName); } catch(IOException e) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestBufferedHTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestBufferedHTable.java new file mode 100644 index 0000000..5f51823 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestBufferedHTable.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; + +import static org.mockito.Mockito.*; + +@Category(SmallTests.class) +public class TestBufferedHTable { + + private static final Delete DELETE1 = new Delete(Bytes.toBytes("row1")); + private static final Delete DELETE2 = new Delete(Bytes.toBytes("row2")); + private static final Delete DELETE3 = new Delete(Bytes.toBytes("row3")); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + /** + * Verifies Delete buffering is disabled by default. + */ + @Test + public void testDeleteBufferingOffByDefault() throws IOException { + HTableInterface mock = mock(HTableInterface.class); + BufferedHTable table = createBufferedHTable(mock, false); + // delete calls will be delegated to HTable immediately if buffering is disabled + table.delete(DELETE1); + verify(mock).delete(DELETE1); + Assert.assertEquals(0, table.bufferedDeletes.size()); + + List deletes = new ArrayList(1); + deletes.add(DELETE2); + table.delete(deletes); + verify(mock).delete(deletes); + Assert.assertEquals(0, table.bufferedDeletes.size()); + } + + /** + * Verifies Deletes are buffered and automatically flushed when write buffer limit is reached. + */ + @Test + public void testDeleteBuffering() throws IOException { + HTableInterface mock = mock(HTableInterface.class); + BufferedHTable table = createBufferedHTable(mock, true); + // return large buffer size to buffer all deletes + doReturn(Long.MAX_VALUE).when(mock).getWriteBufferSize(); + table.delete(DELETE1); + verify(mock, never()).delete(DELETE1); + Assert.assertEquals(1, table.bufferedDeletes.size()); + + List deletes = new ArrayList(); + deletes.add(DELETE2); + deletes.add(DELETE3); + table.delete(deletes); + verify(mock, never()).delete(deletes); + Assert.assertEquals(3, table.bufferedDeletes.size()); + // return small buffer size to trigger flush on next delete + doReturn(10L).when(mock).getWriteBufferSize(); + table.delete(new Delete(Bytes.toBytes("flush"))); + verify(mock).delete(anyListOf(Delete.class)); + Assert.assertEquals(0, table.bufferedDeletes.size()); + } + + /** + * Verifies Deletes are explicitly flushed correctly. + */ + @Test + public void testExplicitDeleteFlushing() throws IOException { + HTableInterface mock = mock(HTableInterface.class); + BufferedHTable table = createBufferedHTable(mock, true); + // return large buffer size to buffer all deletes + doReturn(Long.MAX_VALUE).when(mock).getWriteBufferSize(); + List deletes = Arrays.asList(DELETE1, DELETE2); + table.delete(deletes); + verify(mock, never()).delete(deletes); + Assert.assertEquals(deletes.size(), table.bufferedDeletes.size()); + // explicitly close the table to force a flush + table.close(); + verify(mock).delete(anyListOf(Delete.class)); + verify(mock).flushCommits(); + verify(mock).close(); + Assert.assertEquals(0, table.bufferedDeletes.size()); + // deletes should not be allowed after closing + thrown.expect(IllegalStateException.class); + table.delete(DELETE3); + } + + private BufferedHTable createBufferedHTable(HTableInterface mock, boolean enableBuffering) { + Configuration conf = new Configuration(); + if (enableBuffering) { + // buffering is disabled by default and must be explicitly enabled in job configuration + conf.setBoolean(BufferedHTable.BUFFER_DELETES, true); + } + return new BufferedHTable(conf, mock); + } +} -- 1.8.5.2