diff --git a/src/java/org/apache/hadoop/hbase/client/HConnection.java b/src/java/org/apache/hadoop/hbase/client/HConnection.java index 75f81ff..a43f500 100644 --- a/src/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/src/java/org/apache/hadoop/hbase/client/HConnection.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HServerAddress; @@ -206,6 +208,11 @@ public interface HConnection { * @param tableName The name of the table * @throws IOException */ - public int processBatchOfDeletes(ArrayList list, byte[] tableName) + public int processBatchOfDeletes(List list, byte[] tableName) throws IOException; -} \ No newline at end of file + + public void processBatchOfPuts(List list, + final byte[] tableName, ExecutorService pool) throws IOException; + + +} diff --git a/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 0075ee3..d21eb5d 100644 --- a/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -29,6 +29,10 @@ import java.util.List; import java.util.Map; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -652,6 +656,8 @@ public class HConnectionManager implements HConstants { if (location != null) { return location; } + } else { + deleteCachedLocation(tableName, row); } // build the key of the meta region we should be looking for. @@ -840,37 +846,40 @@ public class HConnectionManager implements HConstants { * requirements. */ private void deleteCachedLocation(final byte [] tableName, - final byte [] row) { - SoftValueSortedMap tableLocations = - getTableLocations(tableName); - - // start to examine the cache. we can only do cache actions - // if there's something in the cache for this table. - if (!tableLocations.isEmpty()) { - // cut the cache so that we only get the part that could contain - // regions that match our key - SoftValueSortedMap matchingRegions = - tableLocations.headMap(row); - - // if that portion of the map is empty, then we're done. otherwise, - // we need to examine the cached location to verify that it is - // a match by end key as well. - if (!matchingRegions.isEmpty()) { - HRegionLocation possibleRegion = - matchingRegions.get(matchingRegions.lastKey()); - byte [] endKey = possibleRegion.getRegionInfo().getEndKey(); - - // by nature of the map, we know that the start key has to be < - // otherwise it wouldn't be in the headMap. - if (KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length, - row, 0, row.length) <= 0) { - // delete any matching entry - HRegionLocation rl = - tableLocations.remove(matchingRegions.lastKey()); - if (rl != null && LOG.isDebugEnabled()) { - LOG.debug("Removed " + rl.getRegionInfo().getRegionNameAsString() + - " for tableName=" + Bytes.toString(tableName) + " from cache " + - "because of " + Bytes.toStringBinary(row)); + final byte [] row) { + synchronized (this.cachedRegionLocations) { + SoftValueSortedMap tableLocations = + getTableLocations(tableName); + + // start to examine the cache. we can only do cache actions + // if there's something in the cache for this table. + if (!tableLocations.isEmpty()) { + // cut the cache so that we only get the part that could contain + // regions that match our key + SoftValueSortedMap matchingRegions = + tableLocations.headMap(row); + + // if that portion of the map is empty, then we're done. otherwise, + // we need to examine the cached location to verify that it is + // a match by end key as well. + if (!matchingRegions.isEmpty()) { + HRegionLocation possibleRegion = + matchingRegions.get(matchingRegions.lastKey()); + byte [] endKey = possibleRegion.getRegionInfo().getEndKey(); + + // by nature of the map, we know that the start key has to be < + // otherwise it wouldn't be in the headMap. + if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) || + KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length, + row, 0, row.length) > 0) { + // delete any matching entry + HRegionLocation rl = + tableLocations.remove(matchingRegions.lastKey()); + if (rl != null && LOG.isDebugEnabled()) { + LOG.debug("Removed " + rl.getRegionInfo().getRegionNameAsString() + + " for tableName=" + Bytes.toString(tableName) + " from cache " + + "because of " + Bytes.toStringBinary(row)); + } } } } @@ -912,7 +921,7 @@ public class HConnectionManager implements HConstants { " is " + location.getServerAddress()); } } - + public HRegionInterface getHRegionConnection( HServerAddress regionServer, boolean getMaster) throws IOException { @@ -1159,7 +1168,7 @@ public class HConnectionManager implements HConstants { * @return Count of how many added or -1 if all added. * @throws IOException */ - int process(final ArrayList list, final byte[] tableName) + int process(final List list, final byte[] tableName) throws IOException { byte [] region = getRegionName(tableName, list.get(0).getRow(), false); byte [] currentRegion = region; @@ -1266,7 +1275,7 @@ public class HConnectionManager implements HConstants { return b.process(list, tableName); } - public int processBatchOfDeletes(final ArrayList list, + public int processBatchOfDeletes(final List list, final byte[] tableName) throws IOException { if (list.isEmpty()) return 0; @@ -1305,5 +1314,127 @@ public class HConnectionManager implements HConstants { } } } - } + + public void processBatchOfPuts(List list, + final byte[] tableName, ExecutorService pool) throws IOException { + for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) { + Collections.sort(list); + Map regionPuts = + new HashMap(); + // step 1: + // break up into regionserver-sized chunks and build the data structs + for ( Put put : list ) { + byte [] row = put.getRow(); + + HRegionLocation loc = locateRegion(tableName, row, true); + HServerAddress address = loc.getServerAddress(); + byte [] regionName = loc.getRegionInfo().getRegionName(); + + MultiPut mput = regionPuts.get(address); + if (mput == null) { + mput = new MultiPut(address); + regionPuts.put(address, mput); + } + mput.add(regionName, put); + } + + // step 2: + // make the requests + // Discard the map, just use a list now, makes error recovery easier. + List multiPuts = new ArrayList(regionPuts.values()); + + List> futures = + new ArrayList>(regionPuts.size()); + for ( MultiPut put : multiPuts ) { + futures.add(pool.submit(createPutCallable(put.address, + put, + tableName))); + } + // RUN! + List failed = new ArrayList(); + + // step 3: + // collect the failures and tries from step 1. + for (int i = 0; i < futures.size(); i++ ) { + Future future = futures.get(i); + MultiPut request = multiPuts.get(i); + try { + MultiPutResponse resp = future.get(); + + // For each region + for (Map.Entry> e : request.puts.entrySet()) { + Integer result = resp.getAnswer(e.getKey()); + if (result == null) { + // failed + LOG.debug("Failed all for region: " + + Bytes.toStringBinary(e.getKey()) + ", removing from cache"); + failed.addAll(e.getValue()); + } else if (result >= 0) { + // some failures + List lst = e.getValue(); + failed.addAll(lst.subList(result, lst.size())); + LOG.debug("Failed past " + result + " for region: " + + Bytes.toStringBinary(e.getKey()) + ", removing from cache"); + } + } + } catch (InterruptedException e) { + // go into the failed list. + LOG.debug("Failed all from " + request.address, e); + failed.addAll(request.allPuts()); + } catch (ExecutionException e) { + System.out.println(e); + // all go into the failed list. + LOG.debug("Failed all from " + request.address, e); + failed.addAll(request.allPuts()); + } + } + list.clear(); + if (!failed.isEmpty()) { + for (Put failedPut: failed) { + deleteCachedLocation(tableName, failedPut.getRow()); + } + + list.addAll(failed); + + long sleepTime = getPauseTime(tries); + LOG.debug("processBatchOfPuts had some failures, sleeping for " + sleepTime + + " ms!"); + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + + } + } + } + if (!list.isEmpty()) { + // ran out of retries and didnt succeed everything! + throw new RetriesExhaustedException("Still had " + list.size() + " puts left after retrying " + + numRetries + " times. Should have detail on which Regions failed the most"); + } + } + + + private Callable createPutCallable( + final HServerAddress address, final MultiPut puts, + final byte [] tableName) { + final HConnection connection = this; + return new Callable() { + public MultiPutResponse call() throws IOException { + return getRegionServerWithRetries( + new ServerCallable(connection, tableName, null) { + public MultiPutResponse call() throws IOException { + MultiPutResponse resp = server.multiPut(puts); + resp.request = puts; + return resp; + } + @Override + public void instantiateServer(boolean reload) throws IOException { + server = connection.getHRegionConnection(address); + } + } + ); + } + }; + } + } } diff --git a/src/java/org/apache/hadoop/hbase/client/HTable.java b/src/java/org/apache/hadoop/hbase/client/HTable.java index e3f0de8..c730b1f 100644 --- a/src/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/java/org/apache/hadoop/hbase/client/HTable.java @@ -26,6 +26,12 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -70,6 +76,8 @@ public class HTable { private boolean autoFlush; private long currentWriteBufferSize; protected int scannerCaching; + private int maxKeyValueSize; + private long maxScannerResultSize; /** @@ -130,12 +138,38 @@ public class HTable { this.autoFlush = true; this.currentWriteBufferSize = 0; this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1); + this.maxScannerResultSize = conf.getLong( HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1); + + int nrHRS = getCurrentNrHRS(); + int nrThreads = conf.getInt("hbase.htable.threads.max", nrHRS); + + // Unfortunately Executors.newCachedThreadPool does not allow us to + // set the maximum size of the pool, so we have to do it ourselves. + this.pool = new ThreadPoolExecutor(0, nrThreads, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue(), + new DaemonThreadFactory()); } /** + * TODO Might want to change this to public, would be nice if the number + * of threads would automatically change when servers were added and removed + * @return the number of region servers that are currently running + * @throws IOException + */ + private int getCurrentNrHRS() throws IOException { + HBaseAdmin admin = new HBaseAdmin(this.configuration); + return admin.getClusterStatus().getServers(); + } + + // For multiput + private ExecutorService pool; + + /** * @param tableName name of table to check * @return true if table is on-line * @throws IOException @@ -604,11 +638,11 @@ public class HTable { * @throws IOException */ public void flushCommits() throws IOException { - int last = 0; try { - last = connection.processBatchOfRows(writeBuffer, tableName); + connection.processBatchOfPuts(writeBuffer, + tableName, pool); } finally { - writeBuffer.subList(0, last).clear(); + // the write buffer was adjsuted by processBatchOfPuts currentWriteBufferSize = 0; for (int i = 0; i < writeBuffer.size(); i++) { currentWriteBufferSize += writeBuffer.get(i).heapSize(); @@ -2198,4 +2232,31 @@ public class HTable { } return n; } + + static class DaemonThreadFactory implements ThreadFactory { + static final AtomicInteger poolNumber = new AtomicInteger(1); + final ThreadGroup group; + final AtomicInteger threadNumber = new AtomicInteger(1); + final String namePrefix; + + DaemonThreadFactory() { + SecurityManager s = System.getSecurityManager(); + group = (s != null)? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + namePrefix = "pool-" + + poolNumber.getAndIncrement() + + "-thread-"; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), + 0); + if (!t.isDaemon()) + t.setDaemon(true); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } + } } diff --git a/src/java/org/apache/hadoop/hbase/client/MultiPut.java b/src/java/org/apache/hadoop/hbase/client/MultiPut.java new file mode 100644 index 0000000..073d132 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/MultiPut.java @@ -0,0 +1,107 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.HServerAddress; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.DataInput; +import java.util.List; +import java.util.Map; +import java.util.ArrayList; +import java.util.Collection; +import java.util.TreeMap; + +public class MultiPut implements Writable { + public HServerAddress address; // client code ONLY + + // map of regions to lists of puts for that region. + public Map > puts = new TreeMap>(Bytes.BYTES_COMPARATOR); + + public MultiPut() {} + + public MultiPut(HServerAddress a) { + address = a; + } + + public int size() { + int size = 0; + for( List l : puts.values()) { + size += l.size(); + } + return size; + } + + public void add(byte[] regionName, Put aPut) { + List rsput = puts.get(regionName); + if (rsput == null) { + rsput = new ArrayList(); + puts.put(regionName, rsput); + } + rsput.add(aPut); + } + + public Collection allPuts() { + List res = new ArrayList(); + for ( List pp : puts.values() ) { + res.addAll(pp); + } + return res; + } + + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(puts.size()); + for( Map.Entry> e : puts.entrySet()) { + Bytes.writeByteArray(out, e.getKey()); + + List ps = e.getValue(); + out.writeInt(ps.size()); + for( Put p : ps ) { + p.write(out); + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + puts.clear(); + + int mapSize = in.readInt(); + + for (int i = 0 ; i < mapSize; i++) { + byte[] key = Bytes.readByteArray(in); + + int listSize = in.readInt(); + List ps = new ArrayList(listSize); + for ( int j = 0 ; j < listSize; j++ ) { + Put put = new Put(); + put.readFields(in); + ps.add(put); + } + puts.put(key, ps); + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java b/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java new file mode 100644 index 0000000..198c964 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/MultiPutResponse.java @@ -0,0 +1,71 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.DataInput; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; +import java.util.TreeMap; + +public class MultiPutResponse implements Writable { + + public MultiPut request; // used in client code ONLY + + public Map answers = new TreeMap(Bytes.BYTES_COMPARATOR); + + public MultiPutResponse() {} + + public void addResult(byte[] regionName, int result) { + answers.put(regionName, result); + } + + public Integer getAnswer(byte[] region) { + return answers.get(region); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(answers.size()); + for( Map.Entry e : answers.entrySet()) { + Bytes.writeByteArray(out, e.getKey()); + out.writeInt(e.getValue()); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + answers.clear(); + + int mapSize = in.readInt(); + for( int i = 0 ; i < mapSize ; i++ ) { + byte[] key = Bytes.readByteArray(in); + int value = in.readInt(); + + answers.put(key, value); + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java b/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java index d78797f..2b09617 100644 --- a/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java +++ b/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.lang.reflect.Array; import java.util.HashMap; import java.util.Map; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,6 +46,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.client.MultiPutResponse; +import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.io.HbaseMapWritable; import org.apache.hadoop.io.MapWritable; @@ -154,6 +157,12 @@ public class HbaseObjectWritable implements Writable, Configurable { addToMap(FirstKeyOnlyFilter.class, code++); addToMap(Delete [].class, code++); + + addToMap(MultiPut.class, code++); + addToMap(MultiPutResponse.class, code++); + + // List + addToMap(List.class, code++); } private Class declaredClass; @@ -457,4 +466,4 @@ public class HbaseObjectWritable implements Writable, Configurable { public Configuration getConf() { return this.conf; } -} \ No newline at end of file +} diff --git a/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java b/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java index 793a6f1..f407886 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HBaseRPCProtocolVersion.java @@ -73,7 +73,9 @@ public interface HBaseRPCProtocolVersion extends VersionedProtocol { *
  • Version 19: Added getClusterStatus().
  • *
  • Version 20: Backed Transaction HBase out of HBase core.
  • *
  • Version 21: HBASE-1665.
  • + *
  • Version 22: HBASE-2209. Added List support to RPC
  • + *
  • Version 23: HBASE-2066, multi-put.
  • * */ - public static final long versionID = 21L; + public static final long versionID = 23L; } diff --git a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index 770e906..902cf89 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.MultiPutResponse; +import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.regionserver.HRegion; /** @@ -245,4 +247,15 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion { * @throws IOException */ public HServerInfo getHServerInfo() throws IOException; + + + /** + * Multi put for putting multiple regions worth of puts at once. + * + * @param puts the request + * @return the reply + * @throws IOException + */ + public MultiPutResponse multiPut(MultiPut puts) throws IOException; + } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 94c506c..3f33b7c 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -83,6 +83,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.ServerConnection; import org.apache.hadoop.hbase.client.ServerConnectionManager; +import org.apache.hadoop.hbase.client.MultiPutResponse; +import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.io.hfile.LruBlockCache; import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; @@ -1781,16 +1783,16 @@ public class HRegionServer implements HConstants, HRegionInterface, if (!region.getRegionInfo().isMetaTable()) { this.cacheFlusher.reclaimMemStoreMemory(); } - Integer[] locks = new Integer[puts.length]; for (i = 0; i < puts.length; i++) { this.requestCount.incrementAndGet(); - locks[i] = getLockFromId(puts[i].getLockId()); - region.put(puts[i], locks[i]); + Integer lock = getLockFromId(puts[i].getLockId()); + region.put(puts[i], lock); } } catch (WrongRegionException ex) { LOG.debug("Batch puts: " + i, ex); return i; } catch (NotServingRegionException ex) { + LOG.debug("Batch puts: " + i, ex); return i; } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); @@ -2517,4 +2519,20 @@ public class HRegionServer implements HConstants, HRegionInterface, doMain(args, regionServerClass); } + + @Override + public MultiPutResponse multiPut(MultiPut puts) throws IOException { + MultiPutResponse resp = new MultiPutResponse(); + + // do each region as it's own. + for( Map.Entry> e: puts.puts.entrySet()) { + int result = put(e.getKey(), e.getValue().toArray(new Put[]{})); + resp.addResult(e.getKey(), result); + + e.getValue().clear(); // clear some RAM + } + + return resp; + } + } diff --git a/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java index 30fdbed..2dfdd0a 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseClusterTestCase.java @@ -81,6 +81,8 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase { } /** + * Subclass hook. + * * Run after dfs is ready but before hbase cluster is started up. */ protected void preHBaseClusterSetup() throws Exception { diff --git a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java index 804e535..234c197 100644 --- a/src/test/org/apache/hadoop/hbase/MultiRegionTable.java +++ b/src/test/org/apache/hadoop/hbase/MultiRegionTable.java @@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.util.Bytes; * Utility class to build a table of multiple regions. */ public class MultiRegionTable extends HBaseClusterTestCase { - private static final byte [][] KEYS = { + protected static final byte [][] KEYS = { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), @@ -62,9 +62,14 @@ public class MultiRegionTable extends HBaseClusterTestCase { /** * @param columnName the column to populate. */ - public MultiRegionTable(final String columnName) { - super(); - this.columnFamily = Bytes.toBytes(columnName); + public MultiRegionTable(final String familyName) { + this(1, familyName); + } + + public MultiRegionTable(int nServers, final String familyName) { + super(nServers); + + this.columnFamily = Bytes.toBytes(familyName); // These are needed for the new and improved Map/Reduce framework System.setProperty("hadoop.log.dir", conf.get("hadoop.log.dir")); conf.set("mapred.output.dir", conf.get("hadoop.tmp.dir")); diff --git a/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java b/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java new file mode 100644 index 0000000..89cc869 --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/TestMultiParallelPut.java @@ -0,0 +1,97 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase; + +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.List; +import java.util.ArrayList; + +public class TestMultiParallelPut extends MultiRegionTable { + private static final byte[] VALUE = Bytes.toBytes("value"); + private static final byte[] QUALIFIER = Bytes.toBytes("qual"); + private static final String FAMILY = "family"; + private static final String TEST_TABLE = "test_table"; + private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); + + + public TestMultiParallelPut() { + super(2, FAMILY); + desc = new HTableDescriptor(TEST_TABLE); + desc.addFamily(new HColumnDescriptor(FAMILY)); + + makeKeys(); + } + + private void makeKeys() { + for (byte [] k : KEYS) { + byte [] cp = new byte[k.length+1]; + System.arraycopy(k, 0, cp, 0, k.length); + cp[k.length] = 1; + + keys.add(cp); + } + } + + List keys = new ArrayList(); + + public void testMultiPut() throws Exception { + + HTable table = new HTable(TEST_TABLE); + table.setAutoFlush(false); + table.setWriteBufferSize(10 * 1024 * 1024); + + for ( byte [] k : keys ) { + Put put = new Put(k); + put.add(BYTES_FAMILY, QUALIFIER, VALUE); + + table.put(put); + } + + table.flushCommits(); + + for (byte [] k : keys ) { + Get get = new Get(k); + get.addColumn(BYTES_FAMILY, QUALIFIER); + + Result r = table.get(get); + + assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); + assertEquals(0, + Bytes.compareTo(VALUE, + r.getValue(BYTES_FAMILY, QUALIFIER))); + } + + HBaseAdmin admin = new HBaseAdmin(conf); + ClusterStatus cs = admin.getClusterStatus(); + + assertEquals(2, cs.getServers()); + for ( HServerInfo info : cs.getServerInfo()) { + System.out.println(info); + assertTrue( info.getLoad().getNumberOfRegions() > 10); + } + } +}