Index: src/test/org/apache/hadoop/hbase/manual/utils/DisplayFormatUtils.java =================================================================== --- src/test/org/apache/hadoop/hbase/manual/utils/DisplayFormatUtils.java (revision 0) +++ src/test/org/apache/hadoop/hbase/manual/utils/DisplayFormatUtils.java (revision 0) @@ -0,0 +1,48 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.manual.utils; + +public class DisplayFormatUtils +{ + public static String formatTime(long elapsedTime) { + String format = String.format("%%0%dd", 2); + elapsedTime = elapsedTime / 1000; + String seconds = String.format(format, elapsedTime % 60); + String minutes = String.format(format, (elapsedTime % 3600) / 60); + String hours = String.format(format, elapsedTime / 3600); + String time = hours + ":" + minutes + ":" + seconds; + return time; + } + + public static String formatNumber(long number) { + if(number >= 1000000000) { + return ((number/1000000000) + "B"); + } + else if(number >= 1000000) { + return ((number/1000000) + "M"); + } + else if(number >= 1000) { + return ((number/1000) + "K"); + } + else { + return (number + ""); + } + } +} Index: src/test/org/apache/hadoop/hbase/manual/utils/DataGenerator.java =================================================================== --- src/test/org/apache/hadoop/hbase/manual/utils/DataGenerator.java (revision 0) +++ src/test/org/apache/hadoop/hbase/manual/utils/DataGenerator.java (revision 0) @@ -0,0 +1,88 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.manual.utils; + +import java.util.Random; + +public class DataGenerator { + static Random random_ = new Random(); + /* one byte fill pattern */ + public static final String fill1B_ = "-"; + /* 64 byte fill pattern */ + public static final String fill64B_ = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789 "; + /* alternate 64 byte fill pattern */ + public static final String fill64BAlt_ = "AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789+-"; + /* 1K fill pattern */ + public static final String fill1K_ = fill64BAlt_+fill64BAlt_+fill64BAlt_+fill64BAlt_+ + fill64BAlt_+fill64BAlt_+fill64BAlt_+fill64BAlt_+ + fill64BAlt_+fill64BAlt_+fill64BAlt_+fill64BAlt_+ + fill64BAlt_+fill64BAlt_+fill64BAlt_+fill64BAlt_; + + int minDataSize_ = 0; + int maxDataSize_ = 0; + + static public String paddedKey(long key) { + // left-pad key with zeroes to 10 decimal places. + String paddedKey = String.format("%010d", key); + + // flip the key to randomize + return (new StringBuffer(paddedKey)).reverse().toString(); + } + + public DataGenerator(int minDataSize, int maxDataSize) { + minDataSize_ = minDataSize; + maxDataSize_ = maxDataSize; + } + + public byte[] getDataInSize(long key) { + int dataSize = minDataSize_ + random_.nextInt(Math.abs(maxDataSize_ - minDataSize_)); + StringBuilder sb = new StringBuilder(); + + // write the key first + int sizeLeft = dataSize; + String keyAsString = DataGenerator.paddedKey(key); + sb.append(keyAsString); + sizeLeft -= keyAsString.length(); + + for(int i = 0; i < sizeLeft/1024; ++i) + { + sb.append(fill1K_); + } + sizeLeft = sizeLeft % 1024; + for(int i = 0; i < sizeLeft/64; ++i) + { + sb.append(fill64B_); + } + sizeLeft = sizeLeft % 64; + for(int i = 0; i < dataSize%64; ++i) + { + sb.append(fill1B_); + } + + return sb.toString().getBytes(); + } + + public static boolean verify(String rowKey, String actionId, String data) { + if(!data.startsWith(rowKey)) { + return false; + } + return true; + } +} Index: src/test/org/apache/hadoop/hbase/manual/utils/HBaseUtils.java =================================================================== --- src/test/org/apache/hadoop/hbase/manual/utils/HBaseUtils.java (revision 0) +++ src/test/org/apache/hadoop/hbase/manual/utils/HBaseUtils.java (revision 0) @@ -0,0 +1,83 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.manual.utils; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; + + +public class HBaseUtils +{ + private static final Log LOG = LogFactory.getLog(HBaseUtils.class); + + public static HTable getHTable(HBaseConfiguration conf, byte[] tableName) { + HTable table = null; + try { + table = new HTable(conf, tableName); + } + catch (IOException e) + { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return table; + } + + public static void createTableIfNotExists(HBaseConfiguration conf, byte[] tableName, byte[][] columnFamilies) { + HTableDescriptor desc = new HTableDescriptor(tableName); + for(byte[] cfName : columnFamilies) { + desc.addFamily(new HColumnDescriptor(cfName)); + } + try + { + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(desc); + } + catch(MasterNotRunningException e) { + LOG.error("Master not running."); + e.printStackTrace(); + } + catch(TableExistsException e) { + LOG.info("Table already exists."); + } + catch (IOException e) + { + LOG.error("IO Exception."); + e.printStackTrace(); + } + } + + public static HBaseConfiguration getHBaseConfFromZkNode(String zkNodeName) throws IOException { + Configuration c = new Configuration(); + c.set("hbase.zookeeper.quorum", zkNodeName); + return new HBaseConfiguration(c); + } + +} Index: src/test/org/apache/hadoop/hbase/manual/utils/MultiThreadedWriter.java =================================================================== --- src/test/org/apache/hadoop/hbase/manual/utils/MultiThreadedWriter.java (revision 0) +++ src/test/org/apache/hadoop/hbase/manual/utils/MultiThreadedWriter.java (revision 0) @@ -0,0 +1,211 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.manual.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.manual.HBaseTest; + +public class MultiThreadedWriter extends MultiThreadedAction +{ + private static final Log LOG = LogFactory.getLog(MultiThreadedWriter.class); + static long minColumnsPerKey_ = 1; + static long maxColumnsPerKey_ = 10; + static int minDataSize_ = 256; + static int maxDataSize_ = 1024; + Set writers_ = new HashSet(); + static boolean bulkLoad_ = false; + /* This is the current key to be inserted by any thread. Each thread does an + atomic get and increment operation and inserts the current value. */ + public static AtomicLong currentKey_ = null; + /* The sorted set of keys inserted by the writers */ + public static SortedSet insertedKeySet_ = Collections.synchronizedSortedSet(new TreeSet()); + /* The sorted set of keys NOT inserted by the writers */ + public static SortedSet failedKeySet_ = Collections.synchronizedSortedSet(new TreeSet()); + + public MultiThreadedWriter(byte[] tableName, byte[] columnFamily) { + tableName_ = tableName; + columnFamily_ = columnFamily; + } + + public void setBulkLoad(boolean bulkLoad) { + bulkLoad_ = bulkLoad; + } + + public void setColumnsPerKey(long minColumnsPerKey, long maxColumnsPerKey) { + minColumnsPerKey_ = minColumnsPerKey; + maxColumnsPerKey_ = maxColumnsPerKey; + } + + public void setDataSize(int minDataSize, int maxDataSize) { + minDataSize_ = minDataSize; + maxDataSize_ = maxDataSize; + } + + public void start(long startKey, long endKey, int numThreads) { + if(verbose_) { + LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")"); + } + startKey_ = startKey; + endKey_ = endKey; + numThreads_ = numThreads; + currentKey_ = new AtomicLong(startKey_); + + for(int i = 0; i < numThreads_; ++i) { + HBaseWriter writer = new HBaseWriter(i); + writers_.add(writer); + } + numThreadsWorking_ = new AtomicInteger(writers_.size()); + for(HBaseWriter writer : writers_) { + writer.start(); + } + + startReporter(); + } + + public void waitForFinish() { + while(numThreadsWorking_.get() != 0) { + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public static class HBaseWriter extends Thread { + int id_; + Random random_ = new Random(); + List tables_ = new ArrayList(); + static DataGenerator dataGenerator_ = new DataGenerator(minDataSize_, maxDataSize_); + + public HBaseWriter(int id) { + id_ = id; + for(HBaseConfiguration conf : HBaseTest.configList_) { +// try { + HTable table = HBaseUtils.getHTable(conf, tableName_); +// LOG.info("setAutoFlush is default"); +// table.setAutoFlush(false); +// table.setWriteBufferSize(1024*1024*12); +// table.setScannerCaching(30); + tables_.add(table); +// } catch(IOException e) { +// e.printStackTrace(); +// } + } + } + + public void run() { + if(MultiThreadedWriter.bulkLoad_) { + long rowKey = currentKey_.getAndIncrement(); + do { + long numColumns = minColumnsPerKey_ + Math.abs(random_.nextLong())%(maxColumnsPerKey_-minColumnsPerKey_); + bulkInsertKey(rowKey, 0, numColumns); + rowKey = currentKey_.getAndIncrement(); + } while(rowKey < endKey_); + } + else { + long rowKey = currentKey_.getAndIncrement(); + do { + long numColumns = minColumnsPerKey_ + Math.abs(random_.nextLong())%(maxColumnsPerKey_-minColumnsPerKey_); + for(long col = 0; col < numColumns; ++col) { + insert(rowKey, col); + } + rowKey = currentKey_.getAndIncrement(); + } while(rowKey < endKey_); + } + numThreadsWorking_.decrementAndGet(); + } + + public static byte[] longToByteArrayKey(long rowKey) { + return DataGenerator.paddedKey(rowKey).getBytes(); + } + + public void insert(long rowKey, long col) { + Put put = new Put(longToByteArrayKey(rowKey)); + put.add(columnFamily_, ("" + col).getBytes(), dataGenerator_.getDataInSize(rowKey)); + try { + long start = System.currentTimeMillis(); + putIntoTables(put); + insertedKeySet_.add(rowKey); + numKeys_.addAndGet(1); + numCols_.addAndGet(1); + cumulativeOpTime_.addAndGet(System.currentTimeMillis() - start); + } + catch (IOException e) { + failedKeySet_.add(rowKey); + e.printStackTrace(); + } + } + + public void bulkInsertKey(long rowKey, long startCol, long endCol) { + if (verbose_) { + LOG.debug("Preparing put for key = " + rowKey + ", cols = [" + startCol + ", " + endCol + ")"); + } + + if(startCol >= endCol) { + return; + } + Put put = new Put(DataGenerator.paddedKey(rowKey).getBytes()); + byte[] columnQualifier; + byte[] value; + for(long i = startCol; i < endCol; ++i) { + value = dataGenerator_.getDataInSize(rowKey); + columnQualifier = ("" + i).getBytes(); + put.add(columnFamily_, columnQualifier, value); + } + try { + long start = System.currentTimeMillis(); + putIntoTables(put); + insertedKeySet_.add(rowKey); + numKeys_.addAndGet(1); + numCols_.addAndGet(endCol - startCol); + cumulativeOpTime_.addAndGet(System.currentTimeMillis() - start); + } + catch (IOException e) { + failedKeySet_.add(rowKey); + e.printStackTrace(); + } + } + + // error handling correct only for ONE table + public void putIntoTables(Put put) throws IOException { + for(HTable table : tables_) { + table.put(put); + } + } + } +} Index: src/test/org/apache/hadoop/hbase/manual/utils/MultiThreadedAction.java =================================================================== --- src/test/org/apache/hadoop/hbase/manual/utils/MultiThreadedAction.java (revision 0) +++ src/test/org/apache/hadoop/hbase/manual/utils/MultiThreadedAction.java (revision 0) @@ -0,0 +1,104 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.manual.utils; + +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + + +public abstract class MultiThreadedAction +{ + private static final Log LOG = LogFactory.getLog(MultiThreadedAction.class); + public static int numThreads_ = 1; + public static byte[] tableName_; + public static byte[] columnFamily_; + public static float verifyPercent_ = 0; + public static long startKey_ = 0; + public static long endKey_ = 1; + public static AtomicInteger numThreadsWorking_; + public static AtomicLong numKeys_ = new AtomicLong(0); + public static AtomicLong numKeysVerified_ = new AtomicLong(0); + public static AtomicLong numCols_ = new AtomicLong(0); + public static AtomicLong numErrors_ = new AtomicLong(0); + public static AtomicLong numOpFailures_ = new AtomicLong(0); + public static AtomicLong cumulativeOpTime_ = new AtomicLong(0); + public static boolean verbose_ = false; + public static Random random_ = new Random(); + + public static void startReporter() { + (new ProgressReporter()).start(); + } + + public static class ProgressReporter extends Thread { + public void run() { + long startTime = System.currentTimeMillis(); + long reportingInterval = 5000; + + long priorNumKeys = 0; + long priorCumulativeOpTime = 0; + + while(numThreadsWorking_.get() != 0) { + String threadsLeft = "[" + numThreadsWorking_.get() + "] "; + if(numKeys_.get() == 0) { + LOG.info(threadsLeft + "Number of keys = 0"); + } + else { + long numKeys = numKeys_.get(); + long time = System.currentTimeMillis() - startTime; + long cumulativeOpTime = cumulativeOpTime_.get(); + + long numKeysDelta = numKeys - priorNumKeys; + long cumulativeOpTimeDelta = cumulativeOpTime - priorCumulativeOpTime; + + LOG.info(threadsLeft + "Keys = " + numKeys + + ", cols = " + DisplayFormatUtils.formatNumber(numCols_.get()) + + ", time = " + DisplayFormatUtils.formatTime(time) + + ((numKeys > 0 && time > 0)? (" Overall: [" + + "keys/s = " + numKeys*1000/time + + ", latency = " + cumulativeOpTime/numKeys + " ms]") + : "") + + ((numKeysDelta > 0) ? (" Current: [" + + "keys/s = " + numKeysDelta*1000/reportingInterval + + ", latency = " + cumulativeOpTimeDelta/numKeysDelta + " ms]") + : "") + + ((numKeysVerified_.get()>0)?(", verified = " + numKeysVerified_.get()):"") + + ((numOpFailures_.get()>0)?(", FAILURES = " + numOpFailures_.get()):"") + + ((numErrors_.get()>0)?(", ERRORS = " + numErrors_.get()):"") + ); + + priorNumKeys = numKeys; + priorCumulativeOpTime = cumulativeOpTime; + } + try { + Thread.sleep(reportingInterval); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + public abstract void start(long startKey, long endKey, int numThreads); +} Index: src/test/org/apache/hadoop/hbase/manual/utils/KillProcessesAndVerify.java =================================================================== --- src/test/org/apache/hadoop/hbase/manual/utils/KillProcessesAndVerify.java (revision 0) +++ src/test/org/apache/hadoop/hbase/manual/utils/KillProcessesAndVerify.java (revision 0) @@ -0,0 +1,190 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.manual.utils; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Random; +import java.util.SortedSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.manual.HBaseTest; +import org.apache.hadoop.hbase.util.Bytes; + + +public class KillProcessesAndVerify extends Thread +{ + private static final Log LOG = LogFactory.getLog(KillProcessesAndVerify.class); + /* One minute in millis */ + public static final int TIME_INTERVAL_ONE_MIN = 1*(60*1000); + /* wait between killing an RS/DN - set to 1 hour */ + public int TIME_INTERVAL_BETWEEN_KILLS = 60 * TIME_INTERVAL_ONE_MIN; + /* percent of time we want the RS killed (as opposed to the DN) */ + public int RS_KILL_PERCENT = 80; + /* how many keys to verify when the server is killed - 4 minute window */ + public long NUM_KEYS_TO_VERIFY = 25000; + /* the HTable */ + HTable hTable_ = null; + /* the cluster name */ + public static String clusterHBasePath_ = null; + public static String clusterHDFSPath_ = null; + + public static Random random_ = new Random(); + private Runtime runtime_ = null; + + public KillProcessesAndVerify(String clusterHBasePath, String clusterHDFSPath, int timeIntervalBetweenKills, int rSKillPercent, int numKeysToVerify) { + runtime_ = Runtime.getRuntime(); + hTable_ = HBaseUtils.getHTable(HBaseTest.configList_.get(0), MultiThreadedWriter.tableName_); + clusterHBasePath_ = clusterHBasePath; + clusterHDFSPath_ = clusterHDFSPath; + TIME_INTERVAL_BETWEEN_KILLS = timeIntervalBetweenKills * TIME_INTERVAL_ONE_MIN; + RS_KILL_PERCENT = rSKillPercent; + NUM_KEYS_TO_VERIFY = numKeysToVerify; + } + + public void run() { + while(true) { + try + { + // wait for the next iteration of kills + Thread.sleep(TIME_INTERVAL_BETWEEN_KILLS); + + // choose if we are killing an RS or a DN + boolean operateOnRS = (random_.nextInt(100) < RS_KILL_PERCENT); + // choose the node we want to kill + long lastWrittenKey = MultiThreadedWriter.currentKey_.get(); + HRegionLocation hloc = hTable_.getRegionLocation(Bytes.toBytes(lastWrittenKey)); + String nodeName = hloc.getServerAddress().getHostname(); + LOG.debug("Picked type = " + (operateOnRS?"REGIONSERVER":"DATANODE") + ", node = " + nodeName); + + // kill the server + String killCommand = getKillCommand(nodeName, operateOnRS); + executeCommand(killCommand); + LOG.debug("Killed " + (operateOnRS?"REGIONSERVER":"DATANODE") + " on node " + nodeName + ", waiting..."); + + if(true) { + break; + } + + // wait for a while + Thread.sleep(TIME_INTERVAL_ONE_MIN/2); + + // start the server + String startCommand = getStartCommand(nodeName, operateOnRS); + executeCommand(startCommand); + LOG.debug("Started " + (operateOnRS?"REGIONSERVER":"DATANODE") + " on node " + nodeName + ", waiting..."); + + // wait for a while + Thread.sleep(TIME_INTERVAL_ONE_MIN); + + // verify the reads that happened in the last 4 minutes - last 25000 keys + long maxKeyBeingWritten = MultiThreadedWriter.currentKey_.get(); + long startKeyToVerify = (maxKeyBeingWritten < NUM_KEYS_TO_VERIFY)?0:(maxKeyBeingWritten - NUM_KEYS_TO_VERIFY); + SortedSet keysToVerify = MultiThreadedWriter.insertedKeySet_.tailSet(startKeyToVerify); + verifyKeys(keysToVerify, MultiThreadedWriter.failedKeySet_); + LOG.debug("Done verifying keys, sleep till next interval"); + } + catch (IOException e1) + { + e1.printStackTrace(); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } + } + } + + public String getKillCommand(String nodeName, boolean rsCommand) { + // construct the remote kill command + String processName = rsCommand?"HRegionServer":"DataNode"; + String killCmd = "/usr/bin/pgrep -f " + processName + " | /usr/bin/xargs /bin/kill -9"; + + // put the ssh call + String remoteKillCmd = "ssh " + nodeName + " " + killCmd; + + return remoteKillCmd; + } + + public String getStartCommand(String nodeName, boolean rsCommand) { + // construct the remote start up command + String startCmd = + rsCommand? + clusterHBasePath_ +"/bin/hbase-daemon.sh start regionserver": + clusterHDFSPath_ + "/bin/hadoop-daemons.sh --config /usr/local/hadoop/HDFS-" + clusterHDFSPath_ + "/conf start datanode"; + + // put the ssh call + String remoteStartCmd = "ssh " + nodeName + " " + startCmd; + + return remoteStartCmd; + } + + public void verifyKeys(SortedSet keysToVerify, SortedSet failedKeys) { + Get get = null; + for(long rowKey : keysToVerify) { + // skip any key that has not been inserted + if(failedKeys.contains(rowKey)) { + continue; + } + // query hbase for the key + get = new Get(MultiThreadedWriter.HBaseWriter.longToByteArrayKey(rowKey)); + get.addFamily(MultiThreadedWriter.columnFamily_); + try + { + Result result = hTable_.get(get); + } + catch (IOException e) + { + // if this is hit, the key was not found + LOG.error("KEY " + rowKey + " was NOT FOUND, it was claimed to be inserted."); + e.printStackTrace(); + } + } + } + + public void executeCommand(String command) throws InterruptedException, IOException { + LOG.debug("Command : " + command); + Process p = runtime_.exec(command); +// p.waitFor(); + BufferedReader stdInput = new BufferedReader(new InputStreamReader(p.getInputStream())); + + BufferedReader stdError = new BufferedReader(new InputStreamReader(p.getErrorStream())); + + // read the output from the command + System.out.println("Here is the standard output of the command:\n"); + String s = null; + + while ((s = stdInput.readLine()) != null) { + System.out.println(s); + } + + // read any errors from the attempted command + System.out.println("Here is the standard error of the command (if any):\n"); + while ((s = stdError.readLine()) != null) { + System.out.println(s); + } + } +} Index: src/test/org/apache/hadoop/hbase/manual/utils/MultiThreadedReader.java =================================================================== --- src/test/org/apache/hadoop/hbase/manual/utils/MultiThreadedReader.java (revision 0) +++ src/test/org/apache/hadoop/hbase/manual/utils/MultiThreadedReader.java (revision 0) @@ -0,0 +1,171 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.manual.utils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.manual.HBaseTest; + +public class MultiThreadedReader extends MultiThreadedAction +{ + private static final Log LOG = LogFactory.getLog(MultiThreadedReader.class); + Set readers_ = new HashSet(); + + public MultiThreadedReader(byte[] tableName, byte[] columnFamily) { + tableName_ = tableName; + columnFamily_ = columnFamily; + } + + public void setVerficationPercent(float verifyPercent) { + verifyPercent_ = verifyPercent; + } + + public void start(long startKey, long endKey, int numThreads) { + if(verbose_) { + LOG.debug("Inserting keys [" + startKey + ", " + endKey + ")"); + } + startKey_ = startKey; + endKey_ = endKey; + numThreads_ = numThreads; + + long threadStartKey = startKey; + long threadEndKey = startKey; + for(int i = 0; i < numThreads_; ++i) { + threadStartKey = threadEndKey; + threadEndKey = startKey + (i+1) * (endKey - startKey) / numThreads_; + HBaseReader writer = new HBaseReader(i, threadStartKey, threadEndKey); + readers_.add(writer); + } + numThreadsWorking_ = new AtomicInteger(readers_.size()); + for(HBaseReader reader : readers_) { + reader.start(); + } + + startReporter(); + } + + public static class HBaseReader extends Thread { + int id_; + List tables_ = new ArrayList(); + long startKey_; + long endKey_; + static int minDataSize_ = 256; + static int maxDataSize_ = 1024; + static DataGenerator dataGenerator_ = new DataGenerator(minDataSize_, maxDataSize_); + + public HBaseReader(int id, long startKey, long endKey) { + id_ = id; + for(HBaseConfiguration conf : HBaseTest.configList_) { + HTable table = HBaseUtils.getHTable(conf, tableName_); + tables_.add(table); + } + startKey_ = startKey; + endKey_ = endKey; + } + + public void run() { + verbose_ = true; + if(verbose_) { + LOG.info("Started thread #" + id_ + " for reads..."); + } + boolean repeatQuery = false; + Get get = null; + long start = 0; + long curKey = 0; + + for(;;) { + if(!repeatQuery) { + curKey = startKey_ + Math.abs(random_.nextLong())%(endKey_ - startKey_); + get = new Get(DataGenerator.paddedKey(curKey).getBytes()); + get.addFamily(columnFamily_); + // get.addColumn(columnFamily_, Bytes.toBytes("0")); + } + repeatQuery = false; + try { + if(verbose_ && repeatQuery) { + LOG.info("[" + id_ + "] " + (repeatQuery?"RE-Querying":"Querying") + " key = " + curKey + ", cf = " + new String(columnFamily_)); + } + queryKey( get, (random_.nextInt(100) < verifyPercent_) ); + } + catch (IOException e) { + numOpFailures_.addAndGet(1); + LOG.debug("[" + id_ + "] FAILED read, key = " + (curKey + "") + ", time = " + (System.currentTimeMillis() - start) + " ms"); + repeatQuery = true; + } + } + } + + public void queryKey(Get get, boolean verify) throws IOException { + String rowKey = new String(get.getRow()); + for(HTable table : tables_) { +// if(verbose_) { +// HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes(rowKey)); +// LOG.info("Key = " + rowKey + ", RegoinServer: " + hloc.getServerAddress().getHostname()); +// } + // read the data + long start = System.currentTimeMillis(); + Result result = table.get(get); + cumulativeOpTime_.addAndGet(System.currentTimeMillis() - start); + numKeys_.addAndGet(1); + + // if we got no data report error + if(result.isEmpty()) { + numErrors_.addAndGet(1); + LOG.error("No data returned, tried to get actions for key = " + rowKey); + } + + if(result.getFamilyMap(columnFamily_) != null) { + + // increment number of columns read + numCols_.addAndGet(result.getFamilyMap(columnFamily_).size()); + + if (verify) { + // verify the result + List keyValues = result.list(); + for(KeyValue kv : keyValues) { + String actionId = new String(kv.getQualifier()); + String data = new String(kv.getValue()); + + // if something does not look right report it + if(!DataGenerator.verify(rowKey, actionId, data)) { + numErrors_.addAndGet(1); + LOG.error("Error checking data for key = " + rowKey + ", actionId = " + actionId); + } + } + + numKeysVerified_.addAndGet(1); + } + } + } + } + } +} Index: src/test/org/apache/hadoop/hbase/manual/HBaseTest.java =================================================================== --- src/test/org/apache/hadoop/hbase/manual/HBaseTest.java (revision 0) +++ src/test/org/apache/hadoop/hbase/manual/HBaseTest.java (revision 0) @@ -0,0 +1,216 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.manual; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.manual.utils.HBaseUtils; +import org.apache.hadoop.hbase.manual.utils.KillProcessesAndVerify; +import org.apache.hadoop.hbase.manual.utils.MultiThreadedReader; +import org.apache.hadoop.hbase.manual.utils.MultiThreadedWriter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +public class HBaseTest +{ + static { + // make the root logger display only errors + Logger.getRootLogger().setLevel(Level.ERROR); + // enable debugging for our package + Logger.getLogger("org.apache.hadoop.hbase.manual").setLevel(Level.DEBUG); + } + // global HBase configuration for the JVM - referenced by all classes. + public static List configList_ = new ArrayList(); + // startup options + public static Options options_ = new Options(); + // command line options object + public static CommandLine cmd_; + + // table name for the test + public static byte[] tableName_ = Bytes.toBytes("test1"); + // column families used by the test + public static byte[][] columnFamilies_ = { Bytes.toBytes("actions") }; + private static final Log LOG = LogFactory.getLog(HBaseTest.class); + static Random random_ = new Random(); + + // usage string for loading data + static final String OPT_USAGE_LOAD = " ::[:]"; + /** + * Reads the following params from the command line: + * ::[:] + */ + public void loadData() { + // parse command line data + String[] cols = cmd_.getOptionValue(OPT_LOAD).split(":"); + long startKey = 0; + long endKey = Long.parseLong(cols[0]); + long minColsPerKey = 1; + long maxColsPerKey = 2 * Long.parseLong(cols[1]); + int minColDataSize = Integer.parseInt(cols[2])/2; + int maxColDataSize = Integer.parseInt(cols[2]) * 3 / 2; + int numThreads = (endKey - startKey > 1000)? 20 : 1; + if (cols.length > 3) { + numThreads = Integer.parseInt(cols[3]); + } + + // print out the args + System.out.printf("Key range %d .. %d\n", startKey, endKey); + System.out.printf("Number of Columns/Key: %d..%d\n", minColsPerKey, maxColsPerKey); + System.out.printf("Data Size/Column: %d..%d bytes\n", minColDataSize, maxColDataSize); + System.out.printf("Client Threads: %d\n", numThreads); + + // start the writers + MultiThreadedWriter writer = new MultiThreadedWriter(tableName_, columnFamilies_[0]); + writer.setBulkLoad(true); + writer.setColumnsPerKey(minColsPerKey, maxColsPerKey); + writer.setDataSize(minColDataSize, maxColDataSize); + writer.start(startKey, endKey, numThreads); + System.out.printf("Started loading data..."); +// writer.waitForFinish(); + } + + static final String OPT_USAGE_READ = " :[::]"; + /** + * Reads the following params from the command line: + * ::[:] + */ + public void readData() { + // parse command line data + String[] cols = cmd_.getOptionValue(OPT_READ).split(":"); + long startKey = Long.parseLong(cols[0]); + long endKey = Long.parseLong(cols[1]); + int verifyPercent = Integer.parseInt(cols[2]); + int numThreads = (endKey - startKey > 1000)? 20 : 1; + if (cols.length > 3) { + numThreads = Integer.parseInt(cols[3]); + } + + // print out the args + System.out.printf("Key range %d .. %d\n", startKey, endKey); + System.out.printf("Verify percent of keys: %d\n", verifyPercent); + System.out.printf("Client Threads: %d\n", numThreads); + + // start the readers + MultiThreadedReader reader = new MultiThreadedReader(tableName_, columnFamilies_[0]); + reader.setVerficationPercent(verifyPercent); + reader.start(startKey, endKey, numThreads); + } + + static final String OPT_USAGE_KILL = " ::::<#keys to verify>"; + /** + * Reads the following params from the command line: + * :: + * ::<# keys to verify after kill (# keys loaded in 4 minutes is a good number)> + */ + public void killAndVerify() { + // parse command line data + String[] cols = cmd_.getOptionValue(OPT_KILL).split(":"); + String clusterHBasePath = cols[0]; + String clusterHDFSPath = cols[1]; + int timeIntervalBetweenKills = Integer.parseInt(cols[2]); + int rSKillPercent = Integer.parseInt(cols[3]); + int numKeysToVerify = Integer.parseInt(cols[4]); + + System.out.printf("Time between kills: %d minutes\n", timeIntervalBetweenKills); + System.out.printf("RegionServer (rest is DataNode) kill percent: %d\n", rSKillPercent); + System.out.printf("Num keys to verify after killing: %d\n", numKeysToVerify); + + (new KillProcessesAndVerify(clusterHBasePath, clusterHDFSPath, timeIntervalBetweenKills, rSKillPercent, numKeysToVerify)).start(); + System.out.printf("Started kill test..."); + } + + public static void main(String[] args) { + try { + // parse the command line args + initAndParseArgs(args); + + HBaseTest hBaseTest = new HBaseTest(); + // create the HBase configuration from ZooKeeper node + String[] zkNodes = cmd_.getOptionValue(OPT_ZKNODE).split(":"); + for(String zkNode : zkNodes) { + HBaseConfiguration conf = HBaseUtils.getHBaseConfFromZkNode(zkNode); + LOG.info("Adding hbase.zookeeper.quorum = " + conf.get("hbase.zookeeper.quorum")); + configList_.add(conf); + } + // create tables if needed + for(HBaseConfiguration conf : configList_) { + HBaseUtils.createTableIfNotExists(conf, tableName_, columnFamilies_); + } + + // write some test data in an infinite loop if needed + if(cmd_.hasOption(OPT_LOAD)) { + hBaseTest.loadData(); + } + // kill servers and verify the data integrity + if(cmd_.hasOption(OPT_KILL)) { + hBaseTest.killAndVerify(); + } + // read the test data in an infinite loop + if(cmd_.hasOption(OPT_READ)) { + hBaseTest.readData(); + } + } catch(Exception e) { + e.printStackTrace(); + printUsage(); + } + } + + private static String USAGE; + private static final String HEADER = "HBase Tests"; + private static final String FOOTER = ""; + private static final String OPT_ZKNODE = "zk"; + private static final String OPT_LOAD = "load"; + private static final String OPT_READ = "read"; + private static final String OPT_KILL = "kill"; + static void initAndParseArgs(String[] args) throws ParseException { + // set the usage object + USAGE = "run_test HBaseTest " + + " -" + OPT_ZKNODE + " " + + " -" + OPT_LOAD + OPT_USAGE_LOAD + + " -" + OPT_READ + OPT_USAGE_READ + + " -" + OPT_KILL + OPT_USAGE_KILL; + // add options + options_.addOption(OPT_ZKNODE , true, "Zookeeper node in the HBase cluster"); + options_.addOption(OPT_LOAD , true, OPT_USAGE_LOAD); + options_.addOption(OPT_READ , true, OPT_USAGE_READ); + options_.addOption(OPT_KILL , true, OPT_USAGE_KILL); + // parse the passed in options + CommandLineParser parser = new BasicParser(); + cmd_ = parser.parse(options_, args); + } + + private static void printUsage() { + HelpFormatter helpFormatter = new HelpFormatter( ); + helpFormatter.setWidth( 80 ); + helpFormatter.printHelp( USAGE, HEADER, options_, FOOTER ); + } +}