diff --git a/src/java/org/apache/hadoop/hbase/client/HConnection.java b/src/java/org/apache/hadoop/hbase/client/HConnection.java index 75f81ff..c65419a 100644 --- a/src/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/src/java/org/apache/hadoop/hbase/client/HConnection.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HServerAddress; @@ -208,4 +210,11 @@ public interface HConnection { */ public int processBatchOfDeletes(ArrayList list, byte[] tableName) throws IOException; -} \ No newline at end of file + + public void processBatchOfPuts(List list, + final byte[] tableName, + ExecutorService pool, + int timeout) throws IOException; + + +} diff --git a/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 35688e4..e05929c 100644 --- a/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -28,6 +28,15 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.TreeSet; +import java.util.WeakHashMap; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -51,6 +60,7 @@ import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.MetaUtils; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.SoftValueSortedMap; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; @@ -845,8 +855,9 @@ public class HConnectionManager implements HConstants { // by nature of the map, we know that the start key has to be < // otherwise it wouldn't be in the headMap. - if (KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length, - row, 0, row.length) <= 0) { + if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || + KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length, + row, 0, row.length) > 0) { // delete any matching entry HRegionLocation rl = tableLocations.remove(matchingRegions.lastKey()); @@ -1286,5 +1294,138 @@ public class HConnectionManager implements HConstants { } } } - } + + public void processBatchOfPuts(List list, + final byte[] tableName, + ExecutorService pool, + int timeout) throws IOException { + for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) { + Collections.sort(list); + Map regionPuts = + new HashMap(); + // step 1: + // break up into regionserver-sized chunks and build the data structs + for ( Put put : list ) { + byte [] row = put.getRow(); + + HRegionLocation loc = locateRegion(tableName, row, true); + HServerAddress address = loc.getServerAddress(); + byte [] regionName = loc.getRegionInfo().getRegionName(); + + MultiPut mput = regionPuts.get(address); + if (mput == null) { + mput = new MultiPut(address); + regionPuts.put(address, mput); + } + mput.add(regionName, put); + } + + // step 2: + // make the requests + // Discard the map, just use a list now, makes error recovery easier. + List multiPuts = new ArrayList(regionPuts.values()); + + List> futures = + new ArrayList>(regionPuts.size()); + for ( MultiPut put : multiPuts ) { + //System.out.println("Multiput: " + put.address + " of siz: " + put.puts.size()); + futures.add(pool.submit(createPutCallable(put.address, + put, + tableName))); + } + // RUN! + List failed = new ArrayList(); + + // step 3: + // collect the failures and tries from step 1. + for (int i = 0; i < futures.size(); i++ ) { + Future future = futures.get(i); + MultiPut request = multiPuts.get(i); + try { + MultiPutResponse resp = future.get(timeout, TimeUnit.SECONDS); + + // For each region + for (Map.Entry> e : request.puts.entrySet()) { + Integer result = resp.getAnswer(e.getKey()); + if (result == null) { + // failed + LOG.debug("Failed all for region: " + + Bytes.toStringBinary(e.getKey()) + ", removing from cache"); + failed.addAll(e.getValue()); + } else if (result >= 0) { + // some failures + List lst = e.getValue(); + failed.addAll(lst.subList(result, lst.size())); + LOG.debug("Failed past " + result + " for region: " + + Bytes.toStringBinary(e.getKey()) + ", removing from cache"); + } + } + } catch (InterruptedException e) { + // go into the failed list. + LOG.debug("Failed all from " + request.address + " due to InterruptedException"); + failed.addAll(request.allPuts()); + } catch (ExecutionException e) { + System.out.println(e); + // all go into the failed list. + LOG.debug("Failed all from " + request.address + " due to ExecutionException"); + failed.addAll(request.allPuts()); + } catch (TimeoutException e) { + // all these go into the failed list. + LOG.debug("Failed all from " + request.address + " due to TimeoutException"); + failed.addAll(request.allPuts()); + } + } + list.clear(); + if (!failed.isEmpty()) { + for (Put failedPut: failed) { + deleteCachedLocation(tableName, failedPut.getRow()); + } + + list.addAll(failed); + + long sleepTime = getPauseTime(tries); + // sleep... + LOG.debug("processBatchOfPuts had some failures, sleeping for " + sleepTime + + " seconds!"); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + + } + } + } + if (!list.isEmpty()) { + // ran out of retries and didnt succeed everything! FUCK! + throw new RetriesExhaustedException("Still had " + list.size() + " puts left after retrying " + + numRetries + " times. Should have detail on which Regions failed the most"); + } + } + + + private Callable createPutCallable( + final HServerAddress address, final MultiPut puts, + final byte [] tableName) { + //System.out.println("Creating put callable: " + address); + final HConnection connection = this; + return new Callable() { + public MultiPutResponse call() throws IOException { + return getRegionServerWithRetries( + new ServerCallable(connection, tableName, null) { + public MultiPutResponse call() throws IOException { + //LOG.debug("Doing put to server: " + address + ", size: " + puts.size()); + MultiPutResponse resp = server.multiPut(puts); + resp.request = puts; + //LOG.debug("Finished put to server: " + address); + return resp; + } + @Override + public void instantiateServer(boolean reload) throws IOException { + server = connection.getHRegionConnection(address); + } + } + ); + } + }; + } + } } diff --git a/src/java/org/apache/hadoop/hbase/client/HTable.java b/src/java/org/apache/hadoop/hbase/client/HTable.java index f3cb47f..b0d0c6d 100644 --- a/src/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/java/org/apache/hadoop/hbase/client/HTable.java @@ -26,6 +26,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -71,6 +73,10 @@ public class HTable { private long currentWriteBufferSize; protected int scannerCaching; + private ExecutorService pool; + private int timeout; + private int nrHRS; + /** * Creates an object to access a HBase table * @@ -129,6 +135,11 @@ public class HTable { this.autoFlush = true; this.currentWriteBufferSize = 0; this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1); + + this.nrHRS = getCurrentNrHRS(); + int nrThreads = conf.getInt("hbase.htable.threads.max", nrHRS); + this.pool = Executors.newFixedThreadPool(nrThreads); + this.timeout = conf.getInt("hbase.htable.timeout", 30); } /** @@ -600,11 +611,13 @@ public class HTable { * @throws IOException */ public void flushCommits() throws IOException { - int last = 0; try { - last = connection.processBatchOfRows(writeBuffer, tableName); + connection.processBatchOfPuts(writeBuffer, + tableName, + pool, + timeout); } finally { - writeBuffer.subList(0, last).clear(); + // the write buffer was adjsuted by processBatchOfPuts currentWriteBufferSize = 0; for (int i = 0; i < writeBuffer.size(); i++) { currentWriteBufferSize += writeBuffer.get(i).heapSize(); @@ -619,6 +632,7 @@ public class HTable { */ public void close() throws IOException{ flushCommits(); + pool.shutdownNow(); } /** @@ -717,6 +731,33 @@ public class HTable { return writeBuffer; } + /** + * Set the timeout for multi requests. + * @param timeout + */ + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + /** + * Get the timeout for multi requests. + * @return the current timeout + */ + public int getTimeout() { + return timeout; + } + + /** + * TODO Might want to change this to public, would be nice if the number + * of threads would atomatically change when servers were added and removed + * @return the number of region servers that are currently running + * @throws IOException + */ + private int getCurrentNrHRS() throws IOException { + HBaseAdmin admin = new HBaseAdmin(configuration); + return admin.getClusterStatus().getServers(); + } + // Old API. Pre-hbase-880, hbase-1304. /** @@ -2186,4 +2227,7 @@ public class HTable { } return n; } + + + } diff --git a/src/java/org/apache/hadoop/hbase/client/MultiPut.java b/src/java/org/apache/hadoop/hbase/client/MultiPut.java new file mode 100644 index 0000000..073d132 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/MultiPut.java @@ -0,0 +1,107 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.HServerAddress; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.DataInput; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import java.util.Collection; +import java.util.TreeMap; + +public class MultiPut implements Writable { + public HServerAddress address; // client code ONLY + + // map of regions to lists of puts for that region. + public Map > puts = new TreeMap>(Bytes.BYTES_COMPARATOR); + + public MultiPut() {} + + public MultiPut(HServerAddress a) { + address = a; + } + + public int size() { + int size = 0; + for( List l : puts.values()) { + size += l.size(); + } + return size; + } + + public void add(byte[] regionName, Put aPut) { + List rsput = puts.get(regionName); + if (rsput == null) { + rsput = new ArrayList(); + puts.put(regionName, rsput); + } + rsput.add(aPut); + } + + public Collection allPuts() { + List res = new ArrayList(); + for ( List pp : puts.values() ) { + res.addAll(pp); + } + return res; + } + + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(puts.size()); + for( Map.Entry> e : puts.entrySet()) { + Bytes.writeByteArray(out, e.getKey()); + + List ps = e.getValue(); + out.writeInt(ps.size()); + for( Put p : ps ) { + p.write(out); + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + puts.clear(); + + int mapSize = in.readInt(); + + for (int i = 0 ; i < mapSize; i++) { + byte[] key = Bytes.readByteArray(in); + + int listSize = in.readInt(); + List ps = new ArrayList(listSize); + for ( int j = 0 ; j < listSize; j++ ) { + Put put = new Put(); + put.readFields(in); + ps.add(put); + } + puts.put(key, ps); + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java b/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java new file mode 100644 index 0000000..198c964 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java @@ -0,0 +1,71 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.DataInput; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.TreeMap; + +public class MultiPutResponse implements Writable { + + public MultiPut request; // used in client code ONLY + + public Map answers = new TreeMap(Bytes.BYTES_COMPARATOR); + + public MultiPutResponse() {} + + public void addResult(byte[] regionName, int result) { + answers.put(regionName, result); + } + + public Integer getAnswer(byte[] region) { + return answers.get(region); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(answers.size()); + for( Map.Entry e : answers.entrySet()) { + Bytes.writeByteArray(out, e.getKey()); + out.writeInt(e.getValue()); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + answers.clear(); + + int mapSize = in.readInt(); + for( int i = 0 ; i < mapSize ; i++ ) { + byte[] key = Bytes.readByteArray(in); + int value = in.readInt(); + + answers.put(key, value); + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java b/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java index d78797f..6e6f1b7 100644 --- a/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java +++ b/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java @@ -44,7 +44,8 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.client.MultiPutResponse; +import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.io.HbaseMapWritable; import org.apache.hadoop.io.MapWritable; @@ -154,6 +155,9 @@ public class HbaseObjectWritable implements Writable, Configurable { addToMap(FirstKeyOnlyFilter.class, code++); addToMap(Delete [].class, code++); + + addToMap(MultiPut.class, code++); + addToMap(MultiPutResponse.class, code++); } private Class declaredClass; diff --git a/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java b/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java index 793a6f1..b7fb103 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java @@ -73,7 +73,8 @@ public interface HBaseRPCProtocolVersion extends VersionedProtocol { *
  • Version 19: Added getClusterStatus().
  • *
  • Version 20: Backed Transaction HBase out of HBase core.
  • *
  • Version 21: HBASE-1665.
  • + *
  • Version 22: HBASE-2066: Parallelize puts
  • * */ - public static final long versionID = 21L; + public static final long versionID = 22L; } diff --git a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index 770e906..902cf89 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.MultiPutResponse; +import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.regionserver.HRegion; /** @@ -245,4 +247,15 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion { * @throws IOException */ public HServerInfo getHServerInfo() throws IOException; + + + /** + * Multi put for putting multiple regions worth of puts at once. + * + * @param puts the request + * @return the reply + * @throws IOException + */ + public MultiPutResponse multiPut(MultiPut puts) throws IOException; + } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 03dccc7..7b0eeb6 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -83,6 +83,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ServerConnection; import org.apache.hadoop.hbase.client.ServerConnectionManager; +import org.apache.hadoop.hbase.client.MultiPutResponse; +import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.io.hfile.LruBlockCache; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; @@ -1776,6 +1782,7 @@ public class HRegionServer implements HConstants, HRegionInterface, LOG.debug("Batch puts: " + i, ex); return i; } catch (NotServingRegionException ex) { + LOG.debug("Batch puts: " + i, ex); return i; } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); @@ -2487,4 +2506,19 @@ public class HRegionServer implements HConstants, HRegionInterface, doMain(args, regionServerClass); } + + @Override + public MultiPutResponse multiPut(MultiPut puts) throws IOException { + System.out.println("HRS.multiPut"); + MultiPutResponse resp = new MultiPutResponse(); + + // do each region as it's own. + for( Map.Entry> e: puts.puts.entrySet()) { + int result = put(e.getKey(), e.getValue().toArray(new Put[]{})); + resp.addResult(e.getKey(), result); + } + + return resp; + } + }