Index: src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/client/HTableUtil.java (revision 0)
@@ -0,0 +1,137 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.lang.InterruptedException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+
+/**
+ * Utility class for HTable.
+ *
+ *
+ */
+public class HTableUtil {
+
+ private static final int INITIAL_LIST_SIZE = 250;
+
+ /**
+ * Processes a List of Puts and writes them to an HTable instance in RegionServer buckets via the htable.put method.
+ * This will utilize the writeBuffer, thus the writeBuffer flush frequency may be tuned accordingly via htable.setWriteBufferSize.
+ *
+ * The benefit of submitting Puts in this manner is to minimize the number of RegionServer RPCs in each flush.
+ *
+ * Assumption #1: Regions have been pre-created for the table. If they haven't, then all of the Puts will go to the same region,
+ * defeating the purpose of this utility method. See the Apache HBase book for an explanation of how to do this.
+ *
+ * Assumption #2: Row-keys are not monotonically increasing. See the Apache HBase book for an explanation of this problem.
+ *
+ * Assumption #3: That the input list of Puts is big enough to be useful (in the thousands or more). The intent of this
+ * method is to process larger chunks of data.
+ *
+ * Assumption #4: htable.setAutoFlush(false) has been set. This is a requirement to use the writeBuffer.
+ *
+ * @param htable HTable instance for target HBase table
+ * @param puts List of Put instances
+ * @throws IOException if a remote or network exception occurs
+ *
+ */
+ public static void bucketRsPut(HTable htable, List puts) throws IOException {
+
+ Map> putMap = createRsPutMap(htable, puts);
+ for (List rsPuts: putMap.values()) {
+ htable.put( rsPuts );
+ }
+ htable.flushCommits();
+ }
+
+ /**
+ * Processes a List of Rows (Put, Delete) and writes them to an HTable instance in RegionServer buckets via the htable.batch method.
+ *
+ * The benefit of submitting Puts in this manner is to minimize the number of RegionServer RPCs, thus this will
+ * produce one RPC of Puts per RegionServer.
+ *
+ * Assumption #1: Regions have been pre-created for the table. If they haven't, then all of the Puts will go to the same region,
+ * defeating the purpose of this utility method. See the Apache HBase book for an explanation of how to do this.
+ *
+ * Assumption #2: Row-keys are not monotonically increasing. See the Apache HBase book for an explanation of this problem.
+ *
+ * Assumption #3: That the input list of Rows is big enough to be useful (in the thousands or more). The intent of this
+ * method is to process larger chunks of data.
+ *
+ * This method accepts a list of Row objects because the underlying .batch method accepts a list of Row objects.
+ *
+ * @param htable HTable instance for target HBase table
+ * @param rows List of Row instances
+ * @throws IOException if a remote or network exception occurs
+ */
+ public static void bucketRsBatch(HTable htable, List rows) throws IOException {
+
+ try {
+ Map> rowMap = createRsRowMap(htable, rows);
+ for (List rsRows: rowMap.values()) {
+ htable.batch( rsRows );
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+
+ }
+
+ private static Map> createRsPutMap(HTable htable, List puts) throws IOException {
+
+ Map> putMap = new HashMap>();
+ for (Put put: puts) {
+ HRegionLocation rl = htable.getRegionLocation( put.getRow() );
+ String hostname = rl.getHostname();
+ List recs = putMap.get( hostname);
+ if (recs == null) {
+ recs = new ArrayList(INITIAL_LIST_SIZE);
+ putMap.put( hostname, recs);
+ }
+ recs.add(put);
+ }
+ return putMap;
+ }
+
+ private static Map> createRsRowMap(HTable htable, List rows) throws IOException {
+
+ Map> rowMap = new HashMap>();
+ for (Row row: rows) {
+ HRegionLocation rl = htable.getRegionLocation( row.getRow() );
+ String hostname = rl.getHostname();
+ List recs = rowMap.get( hostname);
+ if (recs == null) {
+ recs = new ArrayList(INITIAL_LIST_SIZE);
+ rowMap.put( hostname, recs);
+ }
+ recs.add(row);
+ }
+ return rowMap;
+ }
+
+}
Index: src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/client/TestHTableUtil.java (revision 0)
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This class provides tests for the {@link HTableUtil} class
+ *
+ */
+public class TestHTableUtil {
+ final Log LOG = LogFactory.getLog(getClass());
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static byte [] ROW = Bytes.toBytes("testRow");
+ private static byte [] FAMILY = Bytes.toBytes("testFamily");
+ private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
+ private static byte [] VALUE = Bytes.toBytes("testValue");
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster(3);
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBucketPut() throws Exception {
+ byte [] TABLE = Bytes.toBytes("testBucketPut");
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
+ ht.setAutoFlush( false );
+
+ List puts = new ArrayList();
+ puts.add( createPut("row1") );
+ puts.add( createPut("row2") );
+ puts.add( createPut("row3") );
+ puts.add( createPut("row4") );
+
+ HTableUtil.bucketRsPut( ht, puts );
+
+ Scan scan = new Scan();
+ scan.addColumn(FAMILY, QUALIFIER);
+ int count = 0;
+ for(Result result : ht.getScanner(scan)) {
+ count++;
+ }
+ LOG.info("bucket put count=" + count);
+ assertEquals(count, puts.size());
+ }
+
+ private Put createPut(String row) {
+ Put put = new Put( Bytes.toBytes(row));
+ put.add(FAMILY, QUALIFIER, VALUE);
+ return put;
+ }
+
+ /**
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testBucketBatch() throws Exception {
+ byte [] TABLE = Bytes.toBytes("testBucketBatch");
+ HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
+
+ List rows = new ArrayList();
+ rows.add( createPut("row1") );
+ rows.add( createPut("row2") );
+ rows.add( createPut("row3") );
+ rows.add( createPut("row4") );
+
+ HTableUtil.bucketRsBatch( ht, rows );
+
+ Scan scan = new Scan();
+ scan.addColumn(FAMILY, QUALIFIER);
+
+ int count = 0;
+ for(Result result : ht.getScanner(scan)) {
+ count++;
+ }
+ LOG.info("bucket batch count=" + count);
+ assertEquals(count, rows.size());
+ }
+
+}