Index: src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java (revision 0)
+++ src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java (revision 0)
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+
+/**
+ * Multithreaded implementation for @link org.apache.hbase.mapreduce.TableMapper
+ *
+ * It can be used instead when the Map operation is not CPU
+ * bound in order to improve throughput.
+ *
+ * Mapper implementations using this MapRunnable must be thread-safe.
+ *
+ * The Map-Reduce job has to be configured with the mapper to use via
+ * {@link #setMapperClass(Configuration, Class)} and
+ * the number of thread the thread-pool can use with the
+ * {@link #getNumberOfThreads(Configuration) method. The default
+ * value is 10 threads.
+ *
+ */
+
+public class MultithreadedTableMapper extends TableMapper {
+ private static final Log LOG = LogFactory.getLog(MultithreadedTableMapper.class);
+ private Class extends Mapper> mapClass;
+ private Context outer;
+ private ExecutorService executor;
+ public static final String NUMBER_OF_THREADS = "hbase.mapreduce.multithreadedmapper.threads";
+ public static final String MAPPER_CLASS = "hbase.mapreduce.multithreadedmapper.mapclass";
+
+ /**
+ * The number of threads in the thread pool that will run the map function.
+ * @param job the job
+ * @return the number of threads
+ */
+ public static int getNumberOfThreads(JobContext job) {
+ return job.getConfiguration().
+ getInt(NUMBER_OF_THREADS, 10);
+ }
+
+ /**
+ * Set the number of threads in the pool for running maps.
+ * @param job the job to modify
+ * @param threads the new number of threads
+ */
+ public static void setNumberOfThreads(Job job, int threads) {
+ job.getConfiguration().setInt(NUMBER_OF_THREADS,
+ threads);
+ }
+
+ /**
+ * Get the application's mapper class.
+ * @param the map's output key type
+ * @param the map's output value type
+ * @param job the job
+ * @return the mapper class to run
+ */
+ @SuppressWarnings("unchecked")
+ public static
+ Class> getMapperClass(JobContext job) {
+ return (Class>)
+ job.getConfiguration().getClass( MAPPER_CLASS,
+ Mapper.class);
+ }
+
+ /**
+ * Set the application's mapper class.
+ * @param the map output key type
+ * @param the map output value type
+ * @param job the job to modify
+ * @param cls the class to use as the mapper
+ */
+ public static
+ void setMapperClass(Job job,
+ Class extends Mapper> cls) {
+ if (MultithreadedTableMapper.class.isAssignableFrom(cls)) {
+ throw new IllegalArgumentException("Can't have recursive " +
+ "MultithreadedTableMapper instances.");
+ }
+ job.getConfiguration().setClass(MAPPER_CLASS,
+ cls, Mapper.class);
+ }
+
+ /**
+ * Run the application's maps using a thread pool.
+ */
+ @Override
+ public void run(Context context) throws IOException, InterruptedException {
+ outer = context;
+ int numberOfThreads = getNumberOfThreads(context);
+ mapClass = getMapperClass(context);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Configuring multithread runner to use " + numberOfThreads +
+ " threads");
+ }
+ executor = Executors.newFixedThreadPool(numberOfThreads);
+ for(int i=0; i < numberOfThreads; ++i) {
+ MapRunner thread = new MapRunner(context);
+ executor.execute(thread);
+ }
+ executor.shutdown();
+ while (!executor.isTerminated()) {
+ // wait till all the threads are done
+ Thread.sleep(1000);
+ }
+ }
+
+ private class SubMapRecordReader
+ extends RecordReader {
+ private ImmutableBytesWritable key;
+ private Result value;
+ private Configuration conf;
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public void initialize(InputSplit split,
+ TaskAttemptContext context
+ ) throws IOException, InterruptedException {
+ conf = context.getConfiguration();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ synchronized (outer) {
+ if (!outer.nextKeyValue()) {
+ return false;
+ }
+ key = ReflectionUtils.copy(outer.getConfiguration(),
+ outer.getCurrentKey(), key);
+ value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value);
+ return true;
+ }
+ }
+
+ public ImmutableBytesWritable getCurrentKey() {
+ return key;
+ }
+
+ @Override
+ public Result getCurrentValue() {
+ return value;
+ }
+ }
+
+ private class SubMapRecordWriter extends RecordWriter {
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ }
+
+ @Override
+ public void write(K2 key, V2 value) throws IOException,
+ InterruptedException {
+ synchronized (outer) {
+ outer.write(key, value);
+ }
+ }
+ }
+
+ private class SubMapStatusReporter extends StatusReporter {
+
+ @Override
+ public Counter getCounter(Enum> name) {
+ return outer.getCounter(name);
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ return outer.getCounter(group, name);
+ }
+
+ @Override
+ public void progress() {
+ outer.progress();
+ }
+
+ @Override
+ public void setStatus(String status) {
+ outer.setStatus(status);
+ }
+ }
+
+ private class MapRunner implements Runnable {
+ private Mapper mapper;
+ private Context subcontext;
+ private Throwable throwable;
+
+ MapRunner(Context context) throws IOException, InterruptedException {
+ mapper = ReflectionUtils.newInstance(mapClass,
+ context.getConfiguration());
+ subcontext = new Context(outer.getConfiguration(),
+ outer.getTaskAttemptID(),
+ new SubMapRecordReader(),
+ new SubMapRecordWriter(),
+ context.getOutputCommitter(),
+ new SubMapStatusReporter(),
+ outer.getInputSplit());
+ }
+
+ @Override
+ public void run() {
+ try {
+ mapper.run(subcontext);
+ } catch (Throwable ie) {
+ throwable = ie;
+ }
+ }
+ }
+}
\ No newline at end of file
Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java
===================================================================
--- src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java (revision 0)
+++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java (revision 0)
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test Map/Reduce job over HBase tables. The map/reduce process we're testing
+ * on our tables is simple - take every row in the table, reverse the value of
+ * a particular cell, and write it back to the table.
+ */
+@Category(LargeTests.class)
+public class TestMulitthreadedTableMapper {
+ private static final Log LOG = LogFactory.getLog(TestMulitthreadedTableMapper.class);
+ private static final HBaseTestingUtility UTIL =
+ new HBaseTestingUtility();
+ static final String MULTI_REGION_TABLE_NAME = "mrtest";
+ static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");
+ static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");
+ static final int NUMBER_OF_THREADS = 10;
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
+ desc.addFamily(new HColumnDescriptor(INPUT_FAMILY));
+ desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY));
+ UTIL.startMiniCluster();
+ HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration());
+ admin.createTable(desc, HBaseTestingUtility.KEYS);
+ UTIL.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ UTIL.shutdownMiniMapReduceCluster();
+ UTIL.shutdownMiniCluster();
+ }
+
+ /**
+ * Pass the given key and processed record reduce
+ */
+ public static class ProcessContentsMapper
+ extends TableMapper {
+
+ /**
+ * Pass the key, and reversed value to reduce
+ *
+ * @param key
+ * @param value
+ * @param context
+ * @throws IOException
+ */
+ public void map(ImmutableBytesWritable key, Result value,
+ Context context)
+ throws IOException, InterruptedException {
+ if (value.size() != 1) {
+ throw new IOException("There should only be one input column");
+ }
+ Map>>
+ cf = value.getMap();
+ if(!cf.containsKey(INPUT_FAMILY)) {
+ throw new IOException("Wrong input columns. Missing: '" +
+ Bytes.toString(INPUT_FAMILY) + "'.");
+ }
+ // Get the original value and reverse it
+ String originalValue = new String(value.getValue(INPUT_FAMILY, null),
+ HConstants.UTF8_ENCODING);
+ StringBuilder newValue = new StringBuilder(originalValue);
+ newValue.reverse();
+ // Now set the value to be collected
+ Put outval = new Put(key.get());
+ outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString()));
+ context.write(key, outval);
+ }
+ }
+
+ /**
+ * Test multithreadedTableMappper map/reduce against a multi-region table
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ @Test
+ public void testMultithreadedTableMapper()
+ throws IOException, InterruptedException, ClassNotFoundException {
+ runTestOnTable(new HTable(new Configuration(UTIL.getConfiguration()),
+ MULTI_REGION_TABLE_NAME));
+ }
+
+ private void runTestOnTable(HTable table)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Job job = null;
+ try {
+ LOG.info("Before map/reduce startup");
+ job = new Job(table.getConfiguration(), "process column contents");
+ job.setNumReduceTasks(1);
+ Scan scan = new Scan();
+ scan.addFamily(INPUT_FAMILY);
+ TableMapReduceUtil.initTableMapperJob(
+ Bytes.toString(table.getTableName()), scan,
+ MultithreadedTableMapper.class, ImmutableBytesWritable.class,
+ Put.class, job);
+ MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class);
+ MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS);
+ TableMapReduceUtil.initTableReducerJob(
+ Bytes.toString(table.getTableName()),
+ IdentityTableReducer.class, job);
+ FileOutputFormat.setOutputPath(job, new Path("test"));
+ LOG.info("Started " + Bytes.toString(table.getTableName()));
+ job.waitForCompletion(true);
+ LOG.info("After map/reduce completion");
+ // verify map-reduce results
+ verify(Bytes.toString(table.getTableName()));
+ } finally {
+ table.close();
+ if (job != null) {
+ FileUtil.fullyDelete(
+ new File(job.getConfiguration().get("hadoop.tmp.dir")));
+ }
+ }
+ }
+
+ private void verify(String tableName) throws IOException {
+ HTable table = new HTable(new Configuration(UTIL.getConfiguration()), tableName);
+ boolean verified = false;
+ long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000);
+ int numRetries = UTIL.getConfiguration().getInt("hbase.client.retries.number", 5);
+ for (int i = 0; i < numRetries; i++) {
+ try {
+ LOG.info("Verification attempt #" + i);
+ verifyAttempt(table);
+ verified = true;
+ break;
+ } catch (NullPointerException e) {
+ // If here, a cell was empty. Presume its because updates came in
+ // after the scanner had been opened. Wait a while and retry.
+ LOG.debug("Verification attempt failed: " + e.getMessage());
+ }
+ try {
+ Thread.sleep(pause);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+ assertTrue(verified);
+ table.close();
+ }
+
+ /**
+ * Looks at every value of the mapreduce output and verifies that indeed
+ * the values have been reversed.
+ *
+ * @param table Table to scan.
+ * @throws IOException
+ * @throws NullPointerException if we failed to find a cell value
+ */
+ private void verifyAttempt(final HTable table)
+ throws IOException, NullPointerException {
+ Scan scan = new Scan();
+ scan.addFamily(INPUT_FAMILY);
+ scan.addFamily(OUTPUT_FAMILY);
+ ResultScanner scanner = table.getScanner(scan);
+ try {
+ for (Result r : scanner) {
+ if (LOG.isDebugEnabled()) {
+ if (r.size() > 2 ) {
+ throw new IOException("Too many results, expected 2 got " +
+ r.size());
+ }
+ }
+ byte[] firstValue = null;
+ byte[] secondValue = null;
+ int count = 0;
+ for(KeyValue kv : r.list()) {
+ if (count == 0) {
+ firstValue = kv.getValue();
+ }else if (count == 1) {
+ secondValue = kv.getValue();
+ }else if (count == 2) {
+ break;
+ }
+ count++;
+ }
+ String first = "";
+ if (firstValue == null) {
+ throw new NullPointerException(Bytes.toString(r.getRow()) +
+ ": first value is null");
+ }
+ first = new String(firstValue, HConstants.UTF8_ENCODING);
+ String second = "";
+ if (secondValue == null) {
+ throw new NullPointerException(Bytes.toString(r.getRow()) +
+ ": second value is null");
+ }
+ byte[] secondReversed = new byte[secondValue.length];
+ for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
+ secondReversed[i] = secondValue[j];
+ }
+ second = new String(secondReversed, HConstants.UTF8_ENCODING);
+ if (first.compareTo(second) != 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("second key is not the reverse of first. row=" +
+ Bytes.toStringBinary(r.getRow()) + ", first value=" + first +
+ ", second value=" + second);
+ }
+ fail();
+ }
+ }
+ } finally {
+ scanner.close();
+ }
+ }
+
+ @org.junit.Rule
+ public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
+ new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
+}
+