diff --git a/src/java/org/apache/hadoop/hbase/client/HConnection.java b/src/java/org/apache/hadoop/hbase/client/HConnection.java index ecd568d..d71be0a 100644 --- a/src/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/src/java/org/apache/hadoop/hbase/client/HConnection.java @@ -214,4 +214,9 @@ public interface HConnection { */ public int processBatchOfDeletes(List list, byte[] tableName) throws IOException; -} \ No newline at end of file + + public void processBatchOfPuts(List list, + final byte[] tableName) throws IOException; + + +} diff --git a/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 901ac67..a8c8995 100644 --- a/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -29,7 +29,14 @@ import java.util.List; import java.util.Map; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -257,7 +264,10 @@ public class HConnectionManager implements HConstants { private final Object userRegionLock = new Object(); private volatile Configuration conf; - + + // For multiput + private ExecutorService pool; + // Known region HServerAddress.toString() -> HRegionInterface private final Map servers = new ConcurrentHashMap(); @@ -269,6 +279,33 @@ public class HConnectionManager implements HConstants { cachedRegionLocations = new HashMap>(); + private static class DaemonThreadFactory implements ThreadFactory { + static final AtomicInteger poolNumber = new AtomicInteger(1); + final ThreadGroup group; + final AtomicInteger threadNumber = new AtomicInteger(1); + final String namePrefix; + + DaemonThreadFactory() { + SecurityManager s = System.getSecurityManager(); + group = (s != null)? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + namePrefix = "pool-" + + poolNumber.getAndIncrement() + + "-thread-"; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), + 0); + if (!t.isDaemon()) + t.setDaemon(true); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } + } + /** * constructor * @param conf Configuration object @@ -298,6 +335,12 @@ public class HConnectionManager implements HConstants { this.master = null; this.masterChecked = false; + + int nrHRS = 10; + int nrThreads = conf.getInt("hbase.htable.threads.max", nrHRS); + + this.pool = Executors.newFixedThreadPool(nrThreads, + new DaemonThreadFactory()); } private long getPauseTime(int tries) { @@ -846,8 +889,9 @@ public class HConnectionManager implements HConstants { // by nature of the map, we know that the start key has to be < // otherwise it wouldn't be in the headMap. - if (KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length, - row, 0, row.length) <= 0) { + if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || + KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length, + row, 0, row.length) > 0) { // delete any matching entry HRegionLocation rl = tableLocations.remove(matchingRegions.lastKey()); @@ -903,7 +947,7 @@ public class HConnectionManager implements HConstants { " is " + location.getServerAddress()); } } - + public HRegionInterface getHRegionConnection( HServerAddress regionServer, boolean getMaster) throws IOException { @@ -1289,5 +1333,127 @@ public class HConnectionManager implements HConstants { } } } - } -} \ No newline at end of file + + public void processBatchOfPuts(List list, + final byte[] tableName) throws IOException { + for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) { + Collections.sort(list); + Map regionPuts = + new HashMap(); + // step 1: + // break up into regionserver-sized chunks and build the data structs + for ( Put put : list ) { + byte [] row = put.getRow(); + + HRegionLocation loc = locateRegion(tableName, row, true); + HServerAddress address = loc.getServerAddress(); + byte [] regionName = loc.getRegionInfo().getRegionName(); + + MultiPut mput = regionPuts.get(address); + if (mput == null) { + mput = new MultiPut(address); + regionPuts.put(address, mput); + } + mput.add(regionName, put); + } + + // step 2: + // make the requests + // Discard the map, just use a list now, makes error recovery easier. + List multiPuts = new ArrayList(regionPuts.values()); + + List> futures = + new ArrayList>(regionPuts.size()); + for ( MultiPut put : multiPuts ) { + futures.add(pool.submit(createPutCallable(put.address, + put, + tableName))); + } + // RUN! + List failed = new ArrayList(); + + // step 3: + // collect the failures and tries from step 1. + for (int i = 0; i < futures.size(); i++ ) { + Future future = futures.get(i); + MultiPut request = multiPuts.get(i); + try { + MultiPutResponse resp = future.get(); + + // For each region + for (Map.Entry> e : request.puts.entrySet()) { + Integer result = resp.getAnswer(e.getKey()); + if (result == null) { + // failed + LOG.debug("Failed all for region: " + + Bytes.toStringBinary(e.getKey()) + ", removing from cache"); + failed.addAll(e.getValue()); + } else if (result >= 0) { + // some failures + List lst = e.getValue(); + failed.addAll(lst.subList(result, lst.size())); + LOG.debug("Failed past " + result + " for region: " + + Bytes.toStringBinary(e.getKey()) + ", removing from cache"); + } + } + } catch (InterruptedException e) { + // go into the failed list. + LOG.debug("Failed all from " + request.address, e); + failed.addAll(request.allPuts()); + } catch (ExecutionException e) { + System.out.println(e); + // all go into the failed list. + LOG.debug("Failed all from " + request.address, e); + failed.addAll(request.allPuts()); + } + } + list.clear(); + if (!failed.isEmpty()) { + for (Put failedPut: failed) { + deleteCachedLocation(tableName, failedPut.getRow()); + } + + list.addAll(failed); + + long sleepTime = getPauseTime(tries); + LOG.debug("processBatchOfPuts had some failures, sleeping for " + sleepTime + + " ms!"); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + + } + } + } + if (!list.isEmpty()) { + // ran out of retries and didnt succeed everything! + throw new RetriesExhaustedException("Still had " + list.size() + " puts left after retrying " + + numRetries + " times. Should have detail on which Regions failed the most"); + } + } + + + private Callable createPutCallable( + final HServerAddress address, final MultiPut puts, + final byte [] tableName) { + final HConnection connection = this; + return new Callable() { + public MultiPutResponse call() throws IOException { + return getRegionServerWithRetries( + new ServerCallable(connection, tableName, null) { + public MultiPutResponse call() throws IOException { + MultiPutResponse resp = server.multiPut(puts); + resp.request = puts; + return resp; + } + @Override + public void instantiateServer(boolean reload) throws IOException { + server = connection.getHRegionConnection(address); + } + } + ); + } + }; + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/client/HTable.java b/src/java/org/apache/hadoop/hbase/client/HTable.java index 4cddf19..77da7fd 100644 --- a/src/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/java/org/apache/hadoop/hbase/client/HTable.java @@ -582,11 +585,11 @@ public class HTable implements HTableInterface { * @throws IOException */ public void flushCommits() throws IOException { - int last = 0; try { - last = connection.processBatchOfRows(writeBuffer, tableName); + connection.processBatchOfPuts(writeBuffer, + tableName); } finally { - writeBuffer.subList(0, last).clear(); + // the write buffer was adjsuted by processBatchOfPuts currentWriteBufferSize = 0; for (int i = 0; i < writeBuffer.size(); i++) { currentWriteBufferSize += writeBuffer.get(i).heapSize(); diff --git a/src/java/org/apache/hadoop/hbase/client/MultiPut.java b/src/java/org/apache/hadoop/hbase/client/MultiPut.java new file mode 100644 index 0000000..073d132 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/MultiPut.java @@ -0,0 +1,107 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.HServerAddress; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.DataInput; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import java.util.Collection; +import java.util.TreeMap; + +public class MultiPut implements Writable { + public HServerAddress address; // client code ONLY + + // map of regions to lists of puts for that region. + public Map > puts = new TreeMap>(Bytes.BYTES_COMPARATOR); + + public MultiPut() {} + + public MultiPut(HServerAddress a) { + address = a; + } + + public int size() { + int size = 0; + for( List l : puts.values()) { + size += l.size(); + } + return size; + } + + public void add(byte[] regionName, Put aPut) { + List rsput = puts.get(regionName); + if (rsput == null) { + rsput = new ArrayList(); + puts.put(regionName, rsput); + } + rsput.add(aPut); + } + + public Collection allPuts() { + List res = new ArrayList(); + for ( List pp : puts.values() ) { + res.addAll(pp); + } + return res; + } + + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(puts.size()); + for( Map.Entry> e : puts.entrySet()) { + Bytes.writeByteArray(out, e.getKey()); + + List ps = e.getValue(); + out.writeInt(ps.size()); + for( Put p : ps ) { + p.write(out); + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + puts.clear(); + + int mapSize = in.readInt(); + + for (int i = 0 ; i < mapSize; i++) { + byte[] key = Bytes.readByteArray(in); + + int listSize = in.readInt(); + List ps = new ArrayList(listSize); + for ( int j = 0 ; j < listSize; j++ ) { + Put put = new Put(); + put.readFields(in); + ps.add(put); + } + puts.put(key, ps); + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java b/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java new file mode 100644 index 0000000..198c964 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java @@ -0,0 +1,71 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.DataInput; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.TreeMap; + +public class MultiPutResponse implements Writable { + + public MultiPut request; // used in client code ONLY + + public Map answers = new TreeMap(Bytes.BYTES_COMPARATOR); + + public MultiPutResponse() {} + + public void addResult(byte[] regionName, int result) { + answers.put(regionName, result); + } + + public Integer getAnswer(byte[] region) { + return answers.get(region); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(answers.size()); + for( Map.Entry e : answers.entrySet()) { + Bytes.writeByteArray(out, e.getKey()); + out.writeInt(e.getValue()); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + answers.clear(); + + int mapSize = in.readInt(); + for( int i = 0 ; i < mapSize ; i++ ) { + byte[] key = Bytes.readByteArray(in); + int value = in.readInt(); + + answers.put(key, value); + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java b/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java index 4c2998a..3d7a077 100644 --- a/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java +++ b/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java @@ -44,6 +44,8 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.MultiPutResponse; +import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.io.HbaseMapWritable; import org.apache.hadoop.io.MapWritable; @@ -151,6 +153,9 @@ public class HbaseObjectWritable implements Writable, Configurable { addToMap(FirstKeyOnlyFilter.class, code++); addToMap(Delete [].class, code++); + + addToMap(MultiPut.class, code++); + addToMap(MultiPutResponse.class, code++); } private Class declaredClass; diff --git a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index 770e906..902cf89 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.MultiPutResponse; +import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.regionserver.HRegion; /** @@ -245,4 +247,15 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion { * @throws IOException */ public HServerInfo getHServerInfo() throws IOException; + + + /** + * Multi put for putting multiple regions worth of puts at once. + * + * @param puts the request + * @return the reply + * @throws IOException + */ + public MultiPutResponse multiPut(MultiPut puts) throws IOException; + } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 4f605ab..2c5bfe6 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -82,6 +82,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ServerConnection; import org.apache.hadoop.hbase.client.ServerConnectionManager; +import org.apache.hadoop.hbase.client.MultiPutResponse; +import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.io.hfile.LruBlockCache; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; @@ -1688,17 +1690,17 @@ public class HRegionServer implements HConstants, HRegionInterface, if (!region.getRegionInfo().isMetaTable()) { this.cacheFlusher.reclaimMemStoreMemory(); } - Integer[] locks = new Integer[puts.length]; for (i = 0; i < puts.length; i++) { this.requestCount.incrementAndGet(); - locks[i] = getLockFromId(puts[i].getLockId()); - region.put(puts[i], locks[i]); + Integer lock = getLockFromId(puts[i].getLockId()); + region.put(puts[i], lock); } } catch (WrongRegionException ex) { LOG.debug("Batch puts: " + i, ex); return i; } catch (NotServingRegionException ex) { + LOG.debug("Batch puts: " + i, ex); return i; } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); @@ -2406,4 +2408,20 @@ public class HRegionServer implements HConstants, HRegionInterface, doMain(args, regionServerClass); } + + @Override + public MultiPutResponse multiPut(MultiPut puts) throws IOException { + MultiPutResponse resp = new MultiPutResponse(); + + // do each region as it's own. + for( Map.Entry> e: puts.puts.entrySet()) { + int result = put(e.getKey(), e.getValue().toArray(new Put[]{})); + resp.addResult(e.getKey(), result); + + e.getValue().clear(); // clear some RAM + } + + return resp; + } + } diff --git a/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java index 06a45d7..2e72fe2 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java @@ -82,6 +82,8 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase { } /** + * Subclass hook. + * * Run after dfs is ready but before hbase cluster is started up. */ protected void preHBaseClusterSetup() throws Exception { diff --git a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java index f7d2ba4..65e2ea8 100644 --- a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java +++ b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.util.Bytes; * Utility class to build a table of multiple regions. */ public class MultiRegionTable extends HBaseClusterTestCase { - private static final byte [][] KEYS = { + protected static final byte [][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), @@ -63,8 +63,13 @@ public class MultiRegionTable extends HBaseClusterTestCase { * @param familyName the family to populate. */ public MultiRegionTable(final String familyName) { - super(); - this.columnFamily = Bytes.toBytes(familyName); + this(1, familyName); + } + + public MultiRegionTable(int nServers, final String familyName) { + super(nServers); + + this.columnFamily = Bytes.toBytes(familyName); // These are needed for the new and improved Map/Reduce framework System.setProperty("hadoop.log.dir", conf.get("hadoop.log.dir")); conf.set("mapred.output.dir", conf.get("hadoop.tmp.dir")); diff --git a/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java b/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java new file mode 100644 index 0000000..89cc869 --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java @@ -0,0 +1,97 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.List; +import java.util.ArrayList; + +public class TestMultiParallelPut extends MultiRegionTable { + private static final byte[] VALUE = Bytes.toBytes("value"); + private static final byte[] QUALIFIER = Bytes.toBytes("qual"); + private static final String FAMILY = "family"; + private static final String TEST_TABLE = "test_table"; + private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); + + + public TestMultiParallelPut() { + super(2, FAMILY); + desc = new HTableDescriptor(TEST_TABLE); + desc.addFamily(new HColumnDescriptor(FAMILY)); + + makeKeys(); + } + + private void makeKeys() { + for (byte [] k : KEYS) { + byte [] cp = new byte[k.length+1]; + System.arraycopy(k, 0, cp, 0, k.length); + cp[k.length] = 1; + + keys.add(cp); + } + } + + List keys = new ArrayList(); + + public void testMultiPut() throws Exception { + + HTable table = new HTable(TEST_TABLE); + table.setAutoFlush(false); + table.setWriteBufferSize(10 * 1024 * 1024); + + for ( byte [] k : keys ) { + Put put = new Put(k); + put.add(BYTES_FAMILY, QUALIFIER, VALUE); + + table.put(put); + } + + table.flushCommits(); + + for (byte [] k : keys ) { + Get get = new Get(k); + get.addColumn(BYTES_FAMILY, QUALIFIER); + + Result r = table.get(get); + + assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); + assertEquals(0, + Bytes.compareTo(VALUE, + r.getValue(BYTES_FAMILY, QUALIFIER))); + } + + HBaseAdmin admin = new HBaseAdmin(conf); + ClusterStatus cs = admin.getClusterStatus(); + + assertEquals(2, cs.getServers()); + for ( HServerInfo info : cs.getServerInfo()) { + System.out.println(info); + assertTrue( info.getLoad().getNumberOfRegions() > 10); + } + } +}