diff --git .gitignore .gitignore index 4208d61..134eeaa 100644 --- .gitignore +++ .gitignore @@ -1,3 +1,4 @@ +/classes /.classpath /.externalToolBuilders /.project diff --git core/src/main/java/org/apache/hadoop/hbase/client/HTable.java core/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 2d0edc1..30d868d 100644 --- core/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ core/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -62,6 +62,8 @@ import java.util.concurrent.atomic.AtomicInteger; * row locking. */ public class HTable implements HTableInterface { + private static final Log LOG = LogFactory.getLog(HTable.class); + private final HConnection connection; private final byte [] tableName; protected final int scannerTimeout; @@ -482,36 +484,45 @@ public class HTable implements HTableInterface { /** * Commit a Put to the table. *

- * If autoFlush is false, the update is buffered. + * If autoFlush is false, the update is buffered and returning 'false' is ok. + * If autoFlush is true, and the method returns false, then the commit did not go through completely. * @param put data to put + * @return true, if the put has been marshalled successfully to the server. + * false, otherwise. * @throws IOException * @since 0.20.0 */ - public synchronized void put(final Put put) throws IOException { - doPut(Arrays.asList(put)); + public synchronized boolean put(final Put put) throws IOException { + return doPut(Arrays.asList(put)); } /** * Commit a List of Puts to the table. *

- * If autoFlush is false, the update is buffered. + * If autoFlush is false, the update is buffered and returning 'false' is ok. + * If autoFlush is true, and the method returns false, then the commit did not go through completely. * @param puts list of puts + * @return true, if the put has been marshalled successfully to the server. + * false, otherwise. * @throws IOException if a remote or network exception occurs + * * @since 0.20.0 */ - public synchronized void put(final List puts) throws IOException { - doPut(puts); + public synchronized boolean put(final List puts) throws IOException { + return doPut(puts); } - private void doPut(final List puts) throws IOException { + private boolean doPut(final List puts) throws IOException { + boolean sent = false; for (Put put : puts) { validatePut(put); writeBuffer.add(put); currentWriteBufferSize += put.heapSize(); } if (autoFlush || currentWriteBufferSize > writeBufferSize) { - flushCommits(); + sent = flushCommits(); } + return sent; } /** @@ -623,27 +634,35 @@ public class HTable implements HTableInterface { * Commit to the table the buffer of Puts. * Called automatically in the commit methods when autoFlush is true. * @throws IOException e + * @return true, if all underlying commits are in successfully. + * false, otherwise. Implying, some of the commits have been left out. Atomicity not guaranteed. */ - public void flushCommits() throws IOException { + public boolean flushCommits() throws IOException { + boolean flushed = false; try { connection.processBatchOfPuts(writeBuffer, tableName, pool); + flushed = true; } finally { // the write buffer was adjusted by processBatchOfPuts + flushed = (writeBuffer.size() == 0); currentWriteBufferSize = 0; for (Put aWriteBuffer : writeBuffer) { currentWriteBufferSize += aWriteBuffer.heapSize(); } } + return flushed; } - + /** * Release held resources * * @throws IOException */ public void close() throws IOException{ - flushCommits(); + if (!flushCommits() ) { + LOG.warn("Flushing commits did not succeed for " + this.writeBuffer.size() + " rows while closing "); + } } // validate for well-formedness diff --git core/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java core/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java index 1416ac7..bc99534 100644 --- core/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java +++ core/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java @@ -122,22 +122,28 @@ public interface HTableInterface { /** * Commit a Put to the table. *

- * If autoFlush is false, the update is buffered. + * If autoFlush is false, the update is buffered and returning 'false' is ok. + * If autoFlush is true, and the method returns false, then the commit did not go through. * * @param put data * @throws IOException e + * @return true, if the put has been marshalled successfully to the server. + * false, otherwise. */ - void put(Put put) throws IOException; + boolean put(Put put) throws IOException; /** * Commit a List of Puts to the table. *

- * If autoFlush is false, the update is buffered. + * If autoFlush is false, the update is buffered and returning 'false' is ok. + * If autoFlush is true, and the method returns false, then the commit did not go through completely. * * @param puts list of puts * @throws IOException e + * @return true, if the put has been marshalled successfully to the server. + * false, otherwise. */ - void put(List puts) throws IOException; + boolean put(List puts) throws IOException; /** * Atomically checks if a row/family/qualifier value matches the expected @@ -217,8 +223,10 @@ public interface HTableInterface { * Flushes buffer data. Called automatically when autoFlush is true. * * @throws IOException e + * @return true, when all pending commits have been committed successfully. + * false, otherwise. Implies, some of the puts did not get committed successfully. */ - void flushCommits() throws IOException; + boolean flushCommits() throws IOException; /** * Releases held resources.