commit 34b779836ac8ae2948f0cea58c4c28bda0fdf64a Author: nspiegelberg Date: 3 hours ago HBASE-3794 : Ability to Discard Bad HTable Puts diff --git a/CHANGES.txt b/CHANGES.txt index 80fb732..ae9f069 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -119,6 +119,7 @@ Release 0.91.0 - Unreleased to load codec (Alejandro Abdelnur) HBASE-3976 Disable block cache on compactions (Karthik Sankarachary) HBASE-3979 Trivial fixes in code, document (Ming Ma) + HBASE-3794 Ability to Discard Bad HTable Puts IMPROVEMENTS HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack) diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/src/main/java/org/apache/hadoop/hbase/client/HTable.java index c82e1dd..b97027d 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -104,6 +104,7 @@ public class HTable implements HTableInterface, Closeable { private volatile Configuration configuration; private final ArrayList writeBuffer = new ArrayList(); private long writeBufferSize; + private boolean clearBufferOnFail; private boolean autoFlush; private long currentWriteBufferSize; protected int scannerCaching; @@ -187,6 +188,7 @@ public class HTable implements HTableInterface, Closeable { this.configuration = conf; this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW); this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152); + this.clearBufferOnFail = true; this.autoFlush = true; this.currentWriteBufferSize = 0; this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1); @@ -833,10 +835,15 @@ public class HTable implements HTableInterface, Closeable { try { connection.processBatchOfPuts(writeBuffer, tableName, pool); } finally { - // the write buffer was adjusted by processBatchOfPuts - currentWriteBufferSize = 0; - for (Put aPut : writeBuffer) { - currentWriteBufferSize += aPut.heapSize(); + if (clearBufferOnFail) { + writeBuffer.clear(); + currentWriteBufferSize = 0; + } else { + // the write buffer was adjusted by processBatchOfPuts + currentWriteBufferSize = 0; + for (Put aPut : writeBuffer) { + currentWriteBufferSize += aPut.heapSize(); + } } } } @@ -916,25 +923,45 @@ public class HTable implements HTableInterface, Closeable { } /** + * See {@link #setAutoFlush(boolean, boolean)} + * + * @param autoFlush + * Whether or not to enable 'auto-flush'. + */ + public void setAutoFlush(boolean autoFlush) { + setAutoFlush(autoFlush, autoFlush); + } + + /** * Turns 'auto-flush' on or off. *

* When enabled (default), {@link Put} operations don't get buffered/delayed - * and are immediately executed. This is slower but safer. + * and are immediately executed. Failed operations are not retried. This is + * slower but safer. *

- * Turning this off means that multiple {@link Put}s will be accepted before - * any RPC is actually sent to do the write operations. If the application - * dies before pending writes get flushed to HBase, data will be lost. - * Other side effects may include the fact that the application thinks a - * {@link Put} was executed successfully whereas it was in fact only - * buffered and the operation may fail when attempting to flush all pending - * writes. In that case though, the code will retry the failed {@link Put} - * upon its next attempt to flush the buffer. + * Turning off {@link #autoFlush} means that multiple {@link Put}s will be + * accepted before any RPC is actually sent to do the write operations. If the + * application dies before pending writes get flushed to HBase, data will be + * lost. + *

+ * When you turn {@link #autoFlush} off, you should also consider the + * {@link #clearBufferOnFail} option. By default, asynchronous {@link Put) + * requests will be retried on failure until successful. However, this can + * pollute the writeBuffer and slow down batching performance. Additionally, + * you may want to issue a number of Put requests and call + * {@link #flushCommits()} as a barrier. In both use cases, consider setting + * clearBufferOnFail to true to erase the buffer after {@link #flushCommits()} + * has been called, regardless of success. * - * @param autoFlush Whether or not to enable 'auto-flush'. + * @param autoFlush + * Whether or not to enable 'auto-flush'. + * @param clearBufferOnFail + * Whether to keep Put failures in the writeBuffer * @see #flushCommits */ - public void setAutoFlush(boolean autoFlush) { + public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { this.autoFlush = autoFlush; + this.clearBufferOnFail = autoFlush || clearBufferOnFail; } /**