Index: src/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HTable.java (revision 808310) +++ src/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -442,8 +442,10 @@ */ public synchronized void put(final Put put) throws IOException { validatePut(put); - writeBuffer.add(put); - currentWriteBufferSize += put.heapSize(); + synchronized (writeBuffer) { + writeBuffer.add(put); + currentWriteBufferSize += put.heapSize(); + } if(autoFlush || currentWriteBufferSize > writeBufferSize) { flushCommits(); } @@ -460,8 +462,10 @@ public synchronized void put(final List puts) throws IOException { for(Put put : puts) { validatePut(put); - writeBuffer.add(put); - currentWriteBufferSize += put.heapSize(); + synchronized (writeBuffer) { + writeBuffer.add(put); + currentWriteBufferSize += put.heapSize(); + } } if(autoFlush || currentWriteBufferSize > writeBufferSize) { flushCommits(); @@ -578,14 +582,20 @@ * @throws IOException */ public void flushCommits() throws IOException { - try { - connection.processBatchOfRows(writeBuffer, tableName); - } finally { - currentWriteBufferSize = 0; - writeBuffer.clear(); + int last = 0; + synchronized (writeBuffer) { + try { + last = connection.processBatchOfRows(writeBuffer, tableName); + } finally { + writeBuffer.subList(0, last).clear(); + currentWriteBufferSize = 0; + for (int i = 0; i < writeBuffer.size(); i++) { + currentWriteBufferSize += writeBuffer.get(i).heapSize(); + } + } } } - + /** * Release held resources * @@ -683,14 +693,6 @@ } } - /** - * Get the write buffer - * @return the current write buffer - */ - public ArrayList getWriteBuffer() { - return writeBuffer; - } - // Old API. Pre-hbase-880, hbase-1304. /** Index: src/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 808310) +++ src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -998,10 +998,10 @@ return location; } - public void processBatchOfRows(ArrayList list, byte[] tableName) + public int processBatchOfRows(ArrayList list, byte[] tableName) throws IOException { if (list.isEmpty()) { - return; + return 0; } boolean retryOnlyOne = false; if (list.size() > 1) { @@ -1015,7 +1015,8 @@ byte [] region = currentRegion; boolean isLastRow = false; Put [] putarray = new Put[0]; - for (int i = 0, tries = 0; i < list.size() && tries < this.numRetries; i++) { + int i, tries; + for (i = 0, tries = 0; i < list.size() && tries < this.numRetries; i++) { Put put = list.get(i); currentPuts.add(put); // If the next Put goes to a new region, then we are to clear @@ -1073,6 +1074,7 @@ currentPuts.clear(); } } + return i; } void close(boolean stopProxy) { Index: src/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HConnection.java (revision 808310) +++ src/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -189,6 +189,6 @@ * @param tableName The name of the table * @throws IOException */ - public void processBatchOfRows(ArrayList list, byte[] tableName) + public int processBatchOfRows(ArrayList list, byte[] tableName) throws IOException; } \ No newline at end of file