Index: src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java (revision 939184) +++ src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java (working copy) @@ -1,97 +0,0 @@ -/* - * Copyright 2009 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase; - -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.util.Bytes; - -import java.util.List; -import java.util.ArrayList; - -public class TestMultiParallelPut extends MultiRegionTable { - private static final byte[] VALUE = Bytes.toBytes("value"); - private static final byte[] QUALIFIER = Bytes.toBytes("qual"); - private static final String FAMILY = "family"; - private static final String TEST_TABLE = "test_table"; - private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); - - - public TestMultiParallelPut() { - super(2, FAMILY); - desc = new HTableDescriptor(TEST_TABLE); - desc.addFamily(new HColumnDescriptor(FAMILY)); - - makeKeys(); - } - - private void makeKeys() { - for (byte [] k : KEYS) { - byte [] cp = new byte[k.length+1]; - System.arraycopy(k, 0, cp, 0, k.length); - cp[k.length] = 1; - - keys.add(cp); - } - } - - List keys = new ArrayList(); - - public void testMultiPut() throws Exception { - - HTable table = new HTable(TEST_TABLE); - table.setAutoFlush(false); - table.setWriteBufferSize(10 * 1024 * 1024); - - for ( byte [] k : keys ) { - Put put = new Put(k); - put.add(BYTES_FAMILY, QUALIFIER, VALUE); - - table.put(put); - } - - table.flushCommits(); - - for (byte [] k : keys ) { - Get get = new Get(k); - get.addColumn(BYTES_FAMILY, QUALIFIER); - - Result r = table.get(get); - - assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); - assertEquals(0, - Bytes.compareTo(VALUE, - r.getValue(BYTES_FAMILY, QUALIFIER))); - } - - HBaseAdmin admin = new HBaseAdmin(conf); - ClusterStatus cs = admin.getClusterStatus(); - - assertEquals(2, cs.getServers()); - for ( HServerInfo info : cs.getServerInfo()) { - System.out.println(info); - assertTrue( info.getLoad().getNumberOfRegions() > 10); - } - } -} Index: src/test/org/apache/hadoop/hbase/TestMultiParallel.java =================================================================== --- src/test/org/apache/hadoop/hbase/TestMultiParallel.java (revision 0) +++ src/test/org/apache/hadoop/hbase/TestMultiParallel.java (revision 0) @@ -0,0 +1,401 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; + +public class TestMultiParallel extends MultiRegionTable { + + private static final byte[] VALUE = Bytes.toBytes("value"); + private static final byte[] QUALIFIER = Bytes.toBytes("qual"); + private static final String FAMILY = "family"; + private static final String TEST_TABLE = "multi_test_table"; + private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); + private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); + + List keys = new ArrayList(); + + public TestMultiParallel() { + super(2, FAMILY); + desc = new HTableDescriptor(TEST_TABLE); + desc.addFamily(new HColumnDescriptor(FAMILY)); + makeKeys(); + } + + private void makeKeys() { + // Create a "non-uniform" test set with the following characteristics: + // a) Unequal number of keys per region + + // Don't use integer as a multiple, so that we have a number of keys that is not a multiple of the number of regions + int numKeys = (int) ( (float)KEYS.length * 10.33F ); + + for (int i=0; i puts = constructPutRequests(); + table.batch(puts); + + // create a list of gets and run it + List gets = new ArrayList(); + for (byte [] k : keys) { + Get get = new Get(k); + get.addColumn(BYTES_FAMILY, QUALIFIER); + gets.add(get); + } + Result[] multiRes = new Result[gets.size()]; + table.batch(gets, multiRes); + + // Same gets using individual call API + List singleRes = new ArrayList(); + for (Row get : gets) { + singleRes.add(table.get((Get) get)); + } + + // Compare results + assertEquals(singleRes.size(), multiRes.length); + for (int i = 0; i < singleRes.size(); i++) { + assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER)); + KeyValue[] singleKvs = singleRes.get(i).raw(); + KeyValue[] multiKvs = multiRes[i].raw(); + for (int j = 0; j < singleKvs.length; j++) { + assertEquals(singleKvs[j], multiKvs[j]); + assertEquals(0,Bytes.compareTo(singleKvs[j].getValue(), multiKvs[j].getValue())); + } + } + } + + /** + * Only run one Multi test with a forced RegionServer abort. Otherwise, the unit tests + * will take an unnecessarily long time to run. + * @throws Exception + */ + public void testFlushCommitsWithAbort() throws Exception { + doTestFlushCommits(true); + } + + public void testFlushCommitsNoAbort() throws Exception { + doTestFlushCommits(false); + } + + public void doTestFlushCommits(boolean doAbort) throws Exception { + // Load the data + HTable table = new HTable(conf,TEST_TABLE); + table.setAutoFlush(false); + table.setWriteBufferSize(10 * 1024 * 1024); + + List puts = constructPutRequests(); + for (Row put : puts) { + table.put((Put) put); + } + table.flushCommits(); + + if (doAbort) { + cluster.abortRegionServer(0); + + // try putting more keys after the abort. same key/qual... just validating no exceptions thrown + puts = constructPutRequests(); + for (Row put : puts) { + table.put((Put) put); + } + + table.flushCommits(); + } + + validateLoadedData(table); + + // Validate server and region count + HBaseAdmin admin = new HBaseAdmin(conf); + ClusterStatus cs = admin.getClusterStatus(); + assertEquals( (doAbort ? 1 : 2) , cs.getServers()); + for ( HServerInfo info : cs.getServerInfo()) { + System.out.println(info); + assertTrue( info.getLoad().getNumberOfRegions() > 10); + } + } + + public void testBatchWithPut() throws Exception { + + HTable table = new HTable(conf,TEST_TABLE); + + // put multiple rows using a batch + List puts = constructPutRequests(); + + Result[] results = table.batch(puts); + validateSizeAndEmpty(results,keys.size()); + + if (true) { + cluster.abortRegionServer(0); + + puts = constructPutRequests(); + results = table.batch(puts); + validateSizeAndEmpty(results, keys.size()); + } + + validateLoadedData(table); + } + + public void testBatchWithDelete() throws Exception { + + HTable table = new HTable(conf,TEST_TABLE); + + // Load some data + List puts = constructPutRequests(); + Result[] results = table.batch(puts); + validateSizeAndEmpty(results,keys.size()); + + // Deletes + List deletes = new ArrayList(); + for (int i=0; i puts = constructPutRequests(); + Result[] results = table.batch(puts); + validateSizeAndEmpty(results,keys.size()); + + // Deletes + ArrayList deletes = new ArrayList(); + for (int i=0; i puts = new ArrayList(); + for (int i=0; i < 100; i++) { + Put put = new Put(ONE_ROW); + byte[] qual = Bytes.toBytes( "column" + i); + put.add(BYTES_FAMILY, qual, VALUE); + puts.add(put); + } + Result[] results = table.batch(puts); + + // validate + validateSizeAndEmpty(results,100); + + // get the data back and validate that it is correct + List gets = new ArrayList(); + for (int i=0; i < 100; i++) { + Get get = new Get(ONE_ROW); + byte[] qual = Bytes.toBytes( "column" + i); + get.addColumn(BYTES_FAMILY,qual); + gets.add(get); + } + Result[] multiRes = table.batch(gets, new Result[gets.size()]); + + int idx=0; + for (Result r : multiRes) { + byte[] qual = Bytes.toBytes( "column" + idx); + validateResult(r,qual,VALUE); + idx++; + } + + } + + public void testBatchWithMixedActions() throws Exception { + HTable table = new HTable(conf,TEST_TABLE); + + // Load some data to start + Result[] results = table.batch(constructPutRequests()); + validateSizeAndEmpty(results,keys.size()); + + // Batch: get, get, put(new col), delete, get, get of put, get of deleted, put + List actions = new ArrayList(); + + byte[] qual2 = Bytes.toBytes("qual2"); + byte[] val2 = Bytes.toBytes("putvalue2"); + + // 0 get + Get get = new Get(keys.get(10)); + get.addColumn(BYTES_FAMILY, QUALIFIER); + actions.add(get); + + // 1 get + get = new Get(keys.get(11)); + get.addColumn(BYTES_FAMILY, QUALIFIER); + actions.add(get); + + // 2 put of new column + Put put = new Put(keys.get(10)); + put.add(BYTES_FAMILY, qual2, val2); + actions.add(put); + + // 3 delete + Delete delete = new Delete(keys.get(20)); + delete.deleteFamily(BYTES_FAMILY); + actions.add(delete); + + // 4 get + get = new Get(keys.get(30)); + get.addColumn(BYTES_FAMILY, QUALIFIER); + actions.add(get); + + // 5 get of the put in #2 (entire family) + get = new Get(keys.get(10)); + get.addFamily(BYTES_FAMILY); + actions.add(get); + + // 6 get of the delete from #3 + get = new Get(keys.get(20)); + get.addColumn(BYTES_FAMILY, QUALIFIER); + actions.add(get); + + // 7 put of new column + put = new Put(keys.get(40)); + put.add(BYTES_FAMILY, qual2, val2); + actions.add(put); + + results = table.batch(actions); + + // Validation + + validateResult(results[0]); + validateResult(results[1]); + validateEmpty(results[2]); + validateEmpty(results[3]); + validateResult(results[4]); + validateResult(results[5]); + validateResult(results[5],qual2,val2); // testing second column in #5 + validateEmpty(results[6]); // deleted + validateEmpty(results[7]); + + // validate last put, externally from the batch + get = new Get(keys.get(40)); + get.addColumn(BYTES_FAMILY, qual2); + Result r = table.get(get); + validateResult(r,qual2,val2); + } + + + //// Helper methods //// + + private void validateResult(Result r) { + validateResult(r,QUALIFIER,VALUE); + } + + private void validateResult(Result r, byte[] qual, byte[] val) { + assertTrue(r.containsColumn(BYTES_FAMILY, qual)); + assertEquals(0, Bytes.compareTo(val, r.getValue(BYTES_FAMILY, qual))); + } + + private List constructPutRequests() { + List puts = new ArrayList(); + for ( byte [] k : keys ) { + Put put = new Put(k); + put.add(BYTES_FAMILY, QUALIFIER, VALUE); + puts.add(put); + } + return puts; + } + + private void validateLoadedData(HTable table) throws IOException { + // get the data back and validate that it is correct + for (byte[] k : keys) { + Get get = new Get(k); + get.addColumn(BYTES_FAMILY, QUALIFIER); + Result r = table.get(get); + assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); + assertEquals(0, Bytes.compareTo(VALUE, r.getValue(BYTES_FAMILY, QUALIFIER))); + } + } + + private void validateEmpty(Result result) { + assertTrue(result != null); + assertTrue(result.getRow() == null); + assertEquals(0,result.raw().length); + } + + private void validateSizeAndEmpty(Result[] results, int expectedSize) { + // Validate got back the same number of Result objects, all empty + assertEquals(expectedSize,results.length); + for (Result result : results) { + validateEmpty(result); + } + } + +} Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 939184) +++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -76,10 +76,14 @@ import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.HMsg.Type; import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.client.Action; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.MultiAction; +import org.apache.hadoop.hbase.client.MultiResponse; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ServerConnection; import org.apache.hadoop.hbase.client.ServerConnectionManager; @@ -96,6 +100,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.InfoServer; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; @@ -1753,7 +1758,6 @@ throws IOException { if (put.getRow() == null) throw new IllegalArgumentException("update has null row"); - checkOpen(); this.requestCount.incrementAndGet(); HRegion region = getRegion(regionName); @@ -2400,6 +2404,13 @@ return fs; } + /** + * @return the shutdownHDFS + */ + public AtomicBoolean getShutdownHDFS() { + return shutdownHDFS; + } + // // Main program and support routines // @@ -2517,10 +2528,58 @@ HRegionServer.class); doMain(args, regionServerClass); } + + @Override + public MultiResponse multi(MultiAction multi) throws IOException { + MultiResponse response = new MultiResponse(); + for (Map.Entry> e : multi.actions.entrySet()) { + byte[] regionName = e.getKey(); + List actionsForRegion = e.getValue(); + // sort based on the row id - this helps in the case where we reach the + // end of a region, so that we don't have to try the rest of the + // actions in the list. + Collections.sort(actionsForRegion); + Row row = null; + try { + for (Action a : actionsForRegion) { + row = a.getAction(); + if (row instanceof Delete) { + delete(regionName, (Delete) row); + response.add(regionName, new Pair( + a.getOriginalIndex(), new Result())); + } else if (row instanceof Get) { + response.add(regionName, new Pair( + a.getOriginalIndex(), get(regionName, (Get) row))); + } else if (row instanceof Put) { + put(regionName, (Put) row); + response.add(regionName, new Pair( + a.getOriginalIndex(), new Result())); + } else { + LOG.debug("Error: invalid Action, row must be a Get, Delete or Put."); + throw new IOException(); + } + } + } catch (IOException ioe) { + if (multi.size() == 1) { + throw ioe; + } else { + LOG.error("Exception found while attempting " + row.toString() + + " " + StringUtils.stringifyException(ioe)); + response.add(regionName,null); + // stop processing on this region, continue to the next. + } + } + } + + return response; + } - @Override + @Deprecated public MultiPutResponse multiPut(MultiPut puts) throws IOException { + // Consider removing this implementation, and having this method call multi() above, instead. + // Would need to convert MultiPut to MultiAction and MultiResponse to MultiPutResponse. + // So, maybe just deprecate. MultiPutResponse resp = new MultiPutResponse(); // do each region as it's own. @@ -2533,4 +2592,5 @@ return resp; } + } Index: src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java =================================================================== --- src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (revision 939184) +++ src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java (working copy) @@ -116,6 +116,7 @@ try { server = regionServerClass.getConstructor(HBaseConfiguration.class). newInstance(conf); + server.getShutdownHDFS().set(false); } catch (Exception e) { IOException ioe = new IOException(); ioe.initCause(e); Index: src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java =================================================================== --- src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (revision 939184) +++ src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (working copy) @@ -42,10 +42,13 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.MultiAction; +import org.apache.hadoop.hbase.client.Action; +import org.apache.hadoop.hbase.client.MultiResponse; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.client.MultiPutResponse; import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.filter.*; @@ -58,6 +61,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; + /** * This is a customized version of the polymorphic hadoop * {@link ObjectWritable}. It removes UTF8 (HADOOP-414). @@ -161,9 +165,18 @@ addToMap(MultiPut.class, code++); addToMap(MultiPutResponse.class, code++); - + // List addToMap(List.class, code++); + + // + // Multi + // + addToMap(Row.class, code++); + addToMap(Action.class, code++); + addToMap(MultiAction.class, code++); + addToMap(MultiResponse.class, code++); + } private Class declaredClass; Index: src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java =================================================================== --- src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (revision 939184) +++ src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (working copy) @@ -26,6 +26,8 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.MultiAction; +import org.apache.hadoop.hbase.client.MultiResponse; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -233,7 +235,6 @@ public void unlockRow(final byte [] regionName, final long lockId) throws IOException; - /** * Method used when a master is taking the place of another failed one. * @return All regions assigned on this region server @@ -248,6 +249,13 @@ */ public HServerInfo getHServerInfo() throws IOException; + /** + * Method used for doing multiple actions(Deletes, Gets and Puts) in one call + * @param multi + * @return MultiResult + * @throws IOException + */ + public MultiResponse multi(MultiAction multi) throws IOException; /** * Multi put for putting multiple regions worth of puts at once. Index: src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java (revision 939184) +++ src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java (working copy) @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.TreeMap; +@Deprecated public class MultiPutResponse implements Writable { public MultiPut request; // used in client code ONLY Index: src/java/org/apache/hadoop/hbase/client/HConnectionManager.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (revision 939184) +++ src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (working copy) @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.TreeSet; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.MetaUtils; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.SoftValueSortedMap; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; @@ -846,7 +848,7 @@ * Allows flushing the region cache. */ public void clearRegionCache() { - this.cachedRegionLocations.clear(); + this.cachedRegionLocations.clear(); } /* @@ -1088,9 +1090,13 @@ callable.instantiateServer(false); return callable.call(); } catch (Throwable t) { - t = translateException(t); + Throwable t2 = translateException(t); + if (t2 instanceof IOException) { + throw (IOException)t2; + } else { + throw new RuntimeException(t2); + } } - return null; } private HRegionLocation @@ -1127,167 +1133,37 @@ return location; } - /* - * Helper class for batch updates. - * Holds code shared doing batch puts and batch deletes. - */ - private abstract class Batch { - final HConnection c; - - private Batch(final HConnection c) { - this.c = c; - } - - /** - * This is the method subclasses must implement. - * @param currentList - * @param tableName - * @param row - * @return Count of items processed or -1 if all. - * @throws IOException - * @throws RuntimeException - */ - abstract int doCall(final List currentList, - final byte [] row, final byte [] tableName) - throws IOException, RuntimeException; - - /** - * Process the passed list. - * @param list - * @param tableName - * @return Count of how many added or -1 if all added. - * @throws IOException - */ - int process(final List list, final byte[] tableName) - throws IOException { - byte [] region = getRegionName(tableName, list.get(0).getRow(), false); - byte [] currentRegion = region; - boolean isLastRow = false; - boolean retryOnlyOne = false; - List currentList = new ArrayList(); - int i, tries; - for (i = 0, tries = 0; i < list.size() && tries < numRetries; i++) { - Row row = list.get(i); - currentList.add(row); - // If the next record goes to a new region, then we are to clear - // currentList now during this cycle. - isLastRow = (i + 1) == list.size(); - if (!isLastRow) { - region = getRegionName(tableName, list.get(i + 1).getRow(), false); - } - if (!Bytes.equals(currentRegion, region) || isLastRow || retryOnlyOne) { - int index = doCall(currentList, row.getRow(), tableName); - // index is == -1 if all processed successfully, else its index - // of last record successfully processed. - if (index != -1) { - if (tries == numRetries - 1) { - throw new RetriesExhaustedException("Some server, retryOnlyOne=" + - retryOnlyOne + ", index=" + index + ", islastrow=" + isLastRow + - ", tries=" + tries + ", numtries=" + numRetries + ", i=" + i + - ", listsize=" + list.size() + ", region=" + - Bytes.toStringBinary(region), currentRegion, row.getRow(), - tries, new ArrayList()); - } - tries = doBatchPause(currentRegion, tries); - i = i - currentList.size() + index; - retryOnlyOne = true; - // Reload location. - region = getRegionName(tableName, list.get(i + 1).getRow(), true); - } else { - // Reset these flags/counters on successful batch Put - retryOnlyOne = false; - tries = 0; - } - currentRegion = region; - currentList.clear(); - } + public int processBatchOfRows(final ArrayList list, final byte[] tableName, ExecutorService pool) + throws IOException { + Result[] results = new Result[list.size()]; + processBatch( (List) list, tableName, pool, results ); + + int count = 0; + for (Result r : results) { + if (r != null) { + count++; } - return i; } - - /* - * @param t - * @param r - * @param re - * @return Region name that holds passed row r - * @throws IOException - */ - private byte [] getRegionName(final byte [] t, final byte [] r, - final boolean re) - throws IOException { - HRegionLocation location = getRegionLocationForRowWithRetries(t, r, re); - return location.getRegionInfo().getRegionName(); - } - - /* - * Do pause processing before retrying... - * @param currentRegion - * @param tries - * @return New value for tries. - */ - private int doBatchPause(final byte [] currentRegion, final int tries) { - int localTries = tries; - long sleepTime = getPauseTime(tries); - if (LOG.isDebugEnabled()) { - LOG.debug("Reloading region " + Bytes.toStringBinary(currentRegion) + - " location because regionserver didn't accept updates; tries=" + - tries + " of max=" + numRetries + ", waiting=" + sleepTime + "ms"); - } - try { - Thread.sleep(sleepTime); - localTries++; - } catch (InterruptedException e) { - // continue - } - return localTries; - } + + return (count == list.size() ? -1 : count); } - public int processBatchOfRows(final ArrayList list, - final byte[] tableName) + public int processBatchOfDeletes(final List list, + final byte[] tableName, ExecutorService pool) throws IOException { - if (list.isEmpty()) return 0; - if (list.size() > 1) Collections.sort(list); - Batch b = new Batch(this) { - @Override - int doCall(final List currentList, final byte [] row, - final byte [] tableName) - throws IOException, RuntimeException { - final Put [] puts = currentList.toArray(PUT_ARRAY_TYPE); - return getRegionServerWithRetries(new ServerCallable(this.c, - tableName, row) { - public Integer call() throws IOException { - return server.put(location.getRegionInfo().getRegionName(), puts); - } - }); + Result[] results = new Result[list.size()]; + processBatch( (List) list, tableName, pool, results ); + + int count = 0; + for (Result r : results) { + if (r != null) { + count++; } - }; - return b.process(list, tableName); + } + + return (count == list.size() ? -1 : count); } - public int processBatchOfDeletes(final List list, - final byte[] tableName) - throws IOException { - if (list.isEmpty()) return 0; - if (list.size() > 1) Collections.sort(list); - Batch b = new Batch(this) { - @Override - int doCall(final List currentList, final byte [] row, - final byte [] tableName) - throws IOException, RuntimeException { - final Delete [] deletes = currentList.toArray(DELETE_ARRAY_TYPE); - return getRegionServerWithRetries(new ServerCallable(this.c, - tableName, row) { - public Integer call() throws IOException { - return server.delete(location.getRegionInfo().getRegionName(), - deletes); - } - }); - } - }; - return b.process(list, tableName); - } - void close(boolean stopProxy) { if (master != null) { if (stopProxy) { @@ -1305,126 +1181,181 @@ } } - public void processBatchOfPuts(List list, - final byte[] tableName, ExecutorService pool) throws IOException { - for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) { - Collections.sort(list); - Map regionPuts = - new HashMap(); - // step 1: - // break up into regionserver-sized chunks and build the data structs - for ( Put put : list ) { - byte [] row = put.getRow(); + private Callable createCallable( + final HServerAddress address, + final MultiAction multi, + final byte [] tableName) { + final HConnection connection = this; + return new Callable() { + public MultiResponse call() throws IOException { + return getRegionServerForWithoutRetries( + new ServerCallable(connection, tableName, null) { + public MultiResponse call() throws IOException { + return server.multi(multi); + } + @Override + public void instantiateServer(boolean reload) throws IOException { + server = connection.getHRegionConnection(address); + } + } + ); + } + }; + } - HRegionLocation loc = locateRegion(tableName, row, true); - HServerAddress address = loc.getServerAddress(); - byte [] regionName = loc.getRegionInfo().getRegionName(); + public Result[] processBatch(List list, + final byte[] tableName, + ExecutorService pool, + Result[] results) throws IOException { - MultiPut mput = regionPuts.get(address); - if (mput == null) { - mput = new MultiPut(address); - regionPuts.put(address, mput); - } - mput.add(regionName, put); - } + // results must be the same size as list + if (results.length != list.size()) { + throw new IOException("argument results must be the same size as argument list"); + } + + if (list.size() == 0) { + return results; + } + + List workingList = new ArrayList(); + workingList.addAll(list); + boolean singletonList = (list.size() == 1); + boolean retry = true; + Throwable singleRowCause = null; - // step 2: - // make the requests - // Discard the map, just use a list now, makes error recovery easier. - List multiPuts = new ArrayList(regionPuts.values()); + for (int tries = 0; tries < numRetries && retry; ++tries) { - List> futures = - new ArrayList>(regionPuts.size()); - for ( MultiPut put : multiPuts ) { - futures.add(pool.submit(createPutCallable(put.address, - put, - tableName))); + // sleep first, if this is a retry + if (tries >= 1) { + long sleepTime = getPauseTime(tries); + LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!"); + try { Thread.sleep(sleepTime); } catch (InterruptedException ignore) {} } - // RUN! - List failed = new ArrayList(); - // step 3: - // collect the failures and tries from step 1. - for (int i = 0; i < futures.size(); i++ ) { - Future future = futures.get(i); - MultiPut request = multiPuts.get(i); - try { - MultiPutResponse resp = future.get(); + // step 1: break up into regionserver-sized chunks and build the data structs - // For each region - for (Map.Entry> e : request.puts.entrySet()) { - Integer result = resp.getAnswer(e.getKey()); - if (result == null) { - // failed - LOG.debug("Failed all for region: " + - Bytes.toStringBinary(e.getKey()) + ", removing from cache"); - failed.addAll(e.getValue()); - } else if (result >= 0) { - // some failures - List lst = e.getValue(); - failed.addAll(lst.subList(result, lst.size())); - LOG.debug("Failed past " + result + " for region: " + - Bytes.toStringBinary(e.getKey()) + ", removing from cache"); - } + Map actionsByServer = new HashMap(); + for (int i=0; i> futures = + new HashMap>(actionsByServer.size()); + + for (Entry e : actionsByServer.entrySet()) { + futures.put(e.getKey(),pool.submit(createCallable(e.getKey(), e.getValue(), tableName))); + } + + // step 3: collect the failures and successes and prepare for retry - long sleepTime = getPauseTime(tries); - LOG.debug("processBatchOfPuts had some failures, sleeping for " + sleepTime + - " ms!"); + for (Entry> responsePerServer : futures.entrySet()) { + HServerAddress address = responsePerServer.getKey(); + MultiAction request = actionsByServer.get(address); + try { - Thread.sleep(sleepTime); + // Gather the results for one server + Future future = responsePerServer.getValue(); + + MultiResponse resp = future.get(); + + if (resp == null) { + // Entire server failed + LOG.debug("Failed all for server: " + address + ", removing from cache"); + } else { + // For each region + for (Map.Entry>> e : resp.getResults().entrySet()) { + byte[] regionName = e.getKey(); + List> regionResults = e.getValue(); + for (int i=0; i < regionResults.size(); i++) { + Pair regionResult = regionResults.get(i); + if (regionResult.getSecond() == null) { + // failed + LOG.debug("Failures for region: " + Bytes.toStringBinary(regionName) + ", removing from cache"); + } else { + // success + results[regionResult.getFirst()] = regionResult.getSecond(); + } + } + } + } } catch (InterruptedException e) { + LOG.debug("Failed all from " + address, e); + } catch (ExecutionException e) { + LOG.debug("Failed all from " + address, e); + // Just give up, leaving the batch incomplete + if (e.getCause() instanceof DoNotRetryIOException) { + throw (DoNotRetryIOException) e.getCause(); + } + + if (singletonList) { + // be richer for reporting in a 1 row case. + singleRowCause = e.getCause(); + } + } + } + + // Find failures (i.e. null Result), and add them to the workingList (in + // order), so they can be retried. + retry = false; + workingList.clear(); + for (int i=0; i createPutCallable( - final HServerAddress address, final MultiPut puts, - final byte [] tableName) { - final HConnection connection = this; - return new Callable() { - public MultiPutResponse call() throws IOException { - return getRegionServerWithRetries( - new ServerCallable(connection, tableName, null) { - public MultiPutResponse call() throws IOException { - MultiPutResponse resp = server.multiPut(puts); - resp.request = puts; - return resp; - } - @Override - public void instantiateServer(boolean reload) throws IOException { - server = connection.getHRegionConnection(address); - } - } - ); + public void processBatchOfPuts(List list, + final byte[] tableName, + ExecutorService pool) throws IOException { + Result[] results = new Result[list.size()]; + processBatch( (List) list, tableName, pool, results ); + + // mutate list so that it is empty for complete success, or contains only failed records + // results are returned in the same order as the requests in list + // walk the list backwards, so we can remove from list without impacting the indexes of earlier members + for (int i = results.length - 1; i>=0; i--) { + // if result is not null, it succeeded + if (results[i] != null) { + list.remove(i); } - }; + } } private Throwable translateException(Throwable t) throws IOException { @@ -1440,4 +1371,4 @@ return t; } } -} +} \ No newline at end of file Index: src/java/org/apache/hadoop/hbase/client/HConnection.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HConnection.java (revision 939184) +++ src/java/org/apache/hadoop/hbase/client/HConnection.java (working copy) @@ -203,7 +203,7 @@ * @return Count of committed Puts. On fault, < list.size(). * @throws IOException */ - public int processBatchOfRows(ArrayList list, byte[] tableName) + public int processBatchOfRows(ArrayList list, byte[] tableName, ExecutorService pool) throws IOException; /** @@ -213,11 +213,43 @@ * @param tableName The name of the table * @throws IOException */ - public int processBatchOfDeletes(List list, byte[] tableName) + public int processBatchOfDeletes(List list, byte[] tableName, ExecutorService pool) throws IOException; + /** + * Process a mixed batch of Get, Put and Delete actions. All actions for a + * RegionServer are forwarded in one RPC call. + * + * @param actions + * The collection of actions. + * @param tableName + * Name of the hbase table + * @param pool + * thread pool for parallel execution + * @param results + * An empty array, same size as list. If an exception is thrown, + * you can test here for partial results, and to determine which + * actions processed successfully. + * @return The results array that was passed in + * @throws IOException + */ + public Result [] processBatch(List actions, final byte [] tableName, + ExecutorService pool, Result[] results) + throws IOException; + + /** + * Process a batch of Puts. + * + * @param list + * The collection of actions. The list is mutated: all successful + * Puts are removed from the list. + * @param tableName + * Name of the hbase table + * @param pool + * thread pool for parallel execution + * @throws IOException + */ public void processBatchOfPuts(List list, final byte[] tableName, ExecutorService pool) throws IOException; - } Index: src/java/org/apache/hadoop/hbase/client/Get.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/Get.java (revision 939184) +++ src/java/org/apache/hadoop/hbase/client/Get.java (working copy) @@ -61,7 +61,7 @@ *

* To add a filter, execute {@link #setFilter(Filter) setFilter}. */ -public class Get implements Writable { +public class Get implements Writable, Row, Comparable { private byte [] row = null; private long lockId = -1L; private int maxVersions = 1; @@ -349,6 +349,11 @@ return sb.toString(); } + //Row + public int compareTo(Row del) { + return Bytes.compareTo(this.getRow(), del.getRow()); + } + //Writable public void readFields(final DataInput in) throws IOException { Index: src/java/org/apache/hadoop/hbase/client/HTable.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/HTable.java (revision 939184) +++ src/java/org/apache/hadoop/hbase/client/HTable.java (working copy) @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; @@ -83,9 +84,9 @@ private long currentWriteBufferSize; protected int scannerCaching; private int maxKeyValueSize; + private ExecutorService pool; // For Multi + private long maxScannerResultSize; - private long maxScannerResultSize; - /** * Creates an object to access a HBase table. * @@ -144,14 +145,12 @@ this.autoFlush = true; this.currentWriteBufferSize = 0; this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1); - this.maxScannerResultSize = conf.getLong( HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1); - int nrHRS = getCurrentNrHRS(); - int nrThreads = conf.getInt("hbase.htable.threads.max", nrHRS); + int nrThreads = conf.getInt("hbase.htable.threads.max", getCurrentNrHRS()); if (nrThreads == 0) { nrThreads = 1; // this sucks but there it is. } @@ -165,19 +164,17 @@ } /** - * TODO Might want to change this to public, would be nice if the number - * of threads would automatically change when servers were added and removed + * Get the current number of region servers running. * @return the number of region servers that are currently running * @throws IOException */ private int getCurrentNrHRS() throws IOException { + // TODO Might want to change this to public, would be nice if the number + // of threads would automatically change when servers were added and removed HBaseAdmin admin = new HBaseAdmin(this.configuration); return admin.getClusterStatus().getServers(); } - // For multiput - private ExecutorService pool; - /** * Tells whether or not a table is enabled or not. * @param tableName Name of table to check. @@ -489,8 +486,35 @@ } ); } + + /** + * Method that does a batch call on Deletes, Gets and Puts. + * + * @param actions list of Get, Put, Delete objects + * @param results Empty Result[], same size as actions. Provides access to partial + * results, in case an exception is thrown. + * @return the results from the actions. A null in the return array means that + * the call for that action failed, even after retries + * @throws IOException + */ + public synchronized Result[] batch(final List actions, final Result[] results) throws IOException { + return connection.processBatch(actions, tableName, pool, results); + } /** + * Method that does a batch call on Deletes, Gets and Puts. + * + * @param actions list of Get, Put, Delete objects + * @return the results from the actions. A null in the return array means that + * the call for that action failed, even after retries + * @throws IOException + */ + public synchronized Result[] batch(final List actions) throws IOException { + Result[] results = new Result[actions.size()]; + return connection.processBatch(actions, tableName, pool, results); + } + + /** * Deletes the specified cells/row. * * @param delete The object that specifies what to delete. @@ -512,9 +536,8 @@ /** * Deletes the specified cells/rows in bulk. * @param deletes List of things to delete. List gets modified by this - * method (in particular it gets re-ordered, so the order in which the elements - * are inserted in the list gives no guarantee as to the order in which the - * {@link Delete}s are executed). + * method... successful {@link Delete}s are removed. The ordering of the list + * does not change. * @throws IOException if a remote or network exception occurs. In that case * the {@code deletes} argument will contain the {@link Delete} instances * that have not be successfully applied. @@ -522,11 +545,17 @@ */ public void delete(final ArrayList deletes) throws IOException { - int last = 0; - try { - last = connection.processBatchOfDeletes(deletes, this.tableName); - } finally { - deletes.subList(0, last).clear(); + Result[] results = new Result[deletes.size()]; + connection.processBatch( (List) deletes, tableName, pool, results ); + + // mutate list so that it is empty for complete success, or contains only failed records + // results are returned in the same order as the requests in list + // walk the list backwards, so we can remove from list without impacting the indexes of earlier members + for (int i = results.length - 1; i>=0; i--) { + // if result is not null, it succeeded + if (results[i] != null) { + deletes.remove(i); + } } } @@ -693,19 +722,18 @@ * Executes all the buffered {@link Put} operations. *

* This method gets called once automatically for every {@link Put} or batch - * of {@link Put}s (when {@link #put(List)} is used) when - * {@link #isAutoFlush} is {@code true}. + * of {@link Put}s (when {@link #batch(List)} is used) when + * {@link #isAutoFlush()} is {@code true}. * @throws IOException if a remote or network exception occurs. */ public void flushCommits() throws IOException { try { - connection.processBatchOfPuts(writeBuffer, - tableName, pool); + connection.processBatchOfPuts(writeBuffer, tableName, pool); } finally { - // the write buffer was adjsuted by processBatchOfPuts + // the write buffer was adjusted by processBatchOfPuts currentWriteBufferSize = 0; - for (int i = 0; i < writeBuffer.size(); i++) { - currentWriteBufferSize += writeBuffer.get(i).heapSize(); + for (Put aPut : writeBuffer) { + currentWriteBufferSize += aPut.heapSize(); } } } Index: src/java/org/apache/hadoop/hbase/client/Row.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/Row.java (revision 939184) +++ src/java/org/apache/hadoop/hbase/client/Row.java (working copy) @@ -19,12 +19,15 @@ */ package org.apache.hadoop.hbase.client; +import org.apache.hadoop.io.WritableComparable; + /** * Has a row. */ -interface Row { +public interface Row extends WritableComparable { /** * @return The row. */ public byte [] getRow(); -} \ No newline at end of file + +} Index: src/java/org/apache/hadoop/hbase/client/MultiResponse.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/MultiResponse.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/MultiResponse.java (revision 0) @@ -0,0 +1,112 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.HServerAddress; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.DataInput; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import java.util.TreeMap; + +/** + * A container for Result objects, grouped by regionName. + */ +public class MultiResponse implements Writable { + + // map of regionName to list of (Results paired to the original index for that Result) + private Map>> results = new TreeMap>>(Bytes.BYTES_COMPARATOR); + + public MultiResponse() {} + + /** + * @return Number of pairs in this container + */ + public int size() { + int size = 0; + for(Collection c : results.values()) { + size += c.size(); + } + return size; + } + + /** + * Add the pair to the container, grouped by the regionName + * + * @param regionName + * @param r + * First item in the pair is the original index of the Action + * (request). Second item is the Result. Result will be empty for + * successful Put and Delete actions. + */ + public void add(byte[] regionName, Pair r) { + List> rs = results.get(regionName); + if (rs == null) { + rs = new ArrayList>(); + results.put(regionName, rs); + } + rs.add(r); + } + + public Map>> getResults() { + return results; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(results.size()); + for (Map.Entry>> e : results.entrySet()) { + Bytes.writeByteArray(out, e.getKey()); + List> lst = e.getValue(); + out.writeInt(lst.size()); + for( Pair r : lst ) { + out.writeInt(r.getFirst()); + HbaseObjectWritable.writeObject(out, r.getSecond(), Result.class, null); + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + results.clear(); + int mapSize = in.readInt(); + for (int i = 0 ; i < mapSize; i++) { + byte[] key = Bytes.readByteArray(in); + int listSize = in.readInt(); + List> lst = new ArrayList>(listSize); + for ( int j = 0 ; j < listSize; j++ ) { + Integer idx = in.readInt(); + Result r = (Result)HbaseObjectWritable.readObject(in, null); + lst.add(new Pair(idx,r)); + } + results.put(key, lst); + } + } + +} Index: src/java/org/apache/hadoop/hbase/client/Action.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/Action.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/Action.java (revision 0) @@ -0,0 +1,73 @@ +package org.apache.hadoop.hbase.client; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +public class Action implements Writable, Comparable { + + private byte[] regionName; + private Row action; + private int originalIndex; + private Result result; + + public Action() { + super(); + } + + public Action(byte[] regionName, Row action, int originalIndex) { + super(); + this.regionName = regionName; + this.action = action; + this.originalIndex = originalIndex; + } + + public byte[] getRegionName() { + return regionName; + } + public void setRegionName(byte[] regionName) { + this.regionName = regionName; + } + public Result getResult() { + return result; + } + public void setResult(Result result) { + this.result = result; + } + public Row getAction() { + return action; + } + public int getOriginalIndex() { + return originalIndex; + } + + @Override + public int compareTo(Object o) { + return action.compareTo(((Action) o).getAction()); + } + + ///////////////////////////////////////////////////////////////////////////// + // Writable + ///////////////////////////////////////////////////////////////////////////// + + public void write(final DataOutput out) + throws IOException { + Bytes.writeByteArray(out, regionName); + HbaseObjectWritable.writeObject(out, action, Row.class, null); + out.writeInt(originalIndex); + HbaseObjectWritable.writeObject(out, result, Result.class, null); + } + + public void readFields(final DataInput in) + throws IOException { + this.regionName = Bytes.readByteArray(in); + this.action = (Row) HbaseObjectWritable.readObject(in, null); + this.originalIndex = in.readInt(); + this.result = (Result) HbaseObjectWritable.readObject(in, null); + } + +} Index: src/java/org/apache/hadoop/hbase/client/MultiPut.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/MultiPut.java (revision 939184) +++ src/java/org/apache/hadoop/hbase/client/MultiPut.java (working copy) @@ -33,6 +33,7 @@ import java.util.Collection; import java.util.TreeMap; +@Deprecated public class MultiPut implements Writable { public HServerAddress address; // client code ONLY Index: src/java/org/apache/hadoop/hbase/client/MultiAction.java =================================================================== --- src/java/org/apache/hadoop/hbase/client/MultiAction.java (revision 0) +++ src/java/org/apache/hadoop/hbase/client/MultiAction.java (revision 0) @@ -0,0 +1,113 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hbase.io.HbaseObjectWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.HServerAddress; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.DataInput; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import java.util.TreeMap; + +/** + * Container for Actions (i.e. Get, Delete, or Put), which are grouped by regionName. + * Intended to be used with HConnectionManager.processBatch() + */ +public class MultiAction implements Writable { + + // map of regions to lists of puts/gets/deletes for that region. + public Map> actions = new TreeMap>(Bytes.BYTES_COMPARATOR); + + public MultiAction() {} + + /** + * Get the total number of Actions + * @return total number of Actions for all groups in this container. + */ + public int size() { + int size = 0; + for(List l : actions.values()) { + size += l.size(); + } + return size; + } + + /** + * Add an Action to this container based on it's regionName. If the regionName is wrong, the initial + * execution will fail, but will be automatically retried with the attempting to locate the correct region. + * @param regionName + * @param a + */ + public void add(byte[] regionName, Action a) { + List rsActions = actions.get(regionName); + if (rsActions == null) { + rsActions = new ArrayList(); + actions.put(regionName, rsActions); + } + rsActions.add(a); + } + + /** + * @return All actions from all regions in this container + */ + public List allActions() { + List res = new ArrayList(); + for ( List lst : actions.values() ) { + res.addAll(lst); + } + return res; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(actions.size()); + for( Map.Entry> e : actions.entrySet()) { + Bytes.writeByteArray(out, e.getKey()); + List lst = e.getValue(); + out.writeInt(lst.size()); + for( Action a : lst ) { + HbaseObjectWritable.writeObject(out, a, Action.class, null); + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + actions.clear(); + int mapSize = in.readInt(); + for (int i = 0 ; i < mapSize; i++) { + byte[] key = Bytes.readByteArray(in); + int listSize = in.readInt(); + List lst = new ArrayList(listSize); + for ( int j = 0 ; j < listSize; j++ ) { + lst.add((Action)HbaseObjectWritable.readObject(in, null)); + } + actions.put(key, lst); + } + } + +}