Index: src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java (revision 0) @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.StatusReporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.util.ReflectionUtils; + + +/** + * Multithreaded implementation for @link org.apache.hbase.mapreduce.TableMapper + *

+ * It can be used instead when the Map operation is not CPU + * bound in order to improve throughput. + *

+ * Mapper implementations using this MapRunnable must be thread-safe. + *

+ * The Map-Reduce job has to be configured with the mapper to use via + * {@link #setMapperClass(Configuration, Class)} and + * the number of thread the thread-pool can use with the + * {@link #getNumberOfThreads(Configuration) method. The default + * value is 10 threads. + *

+ */ + +public class MultithreadedTableMapper extends TableMapper { + private static final Log LOG = LogFactory.getLog(MultithreadedTableMapper.class); + private Class> mapClass; + private Context outer; + private ExecutorService executor; + public static final String NUMBER_OF_THREADS = "hbase.mapreduce.multithreadedmapper.threads"; + public static final String MAPPER_CLASS = "hbase.mapreduce.multithreadedmapper.mapclass"; + + /** + * The number of threads in the thread pool that will run the map function. + * @param job the job + * @return the number of threads + */ + public static int getNumberOfThreads(JobContext job) { + return job.getConfiguration(). + getInt(NUMBER_OF_THREADS, 10); + } + + /** + * Set the number of threads in the pool for running maps. + * @param job the job to modify + * @param threads the new number of threads + */ + public static void setNumberOfThreads(Job job, int threads) { + job.getConfiguration().setInt(NUMBER_OF_THREADS, + threads); + } + + /** + * Get the application's mapper class. + * @param the map's output key type + * @param the map's output value type + * @param job the job + * @return the mapper class to run + */ + @SuppressWarnings("unchecked") + public static + Class> getMapperClass(JobContext job) { + return (Class>) + job.getConfiguration().getClass( MAPPER_CLASS, + Mapper.class); + } + + /** + * Set the application's mapper class. + * @param the map output key type + * @param the map output value type + * @param job the job to modify + * @param cls the class to use as the mapper + */ + public static + void setMapperClass(Job job, + Class> cls) { + if (MultithreadedTableMapper.class.isAssignableFrom(cls)) { + throw new IllegalArgumentException("Can't have recursive " + + "MultithreadedTableMapper instances."); + } + job.getConfiguration().setClass(MAPPER_CLASS, + cls, Mapper.class); + } + + /** + * Run the application's maps using a thread pool. + */ + @Override + public void run(Context context) throws IOException, InterruptedException { + outer = context; + int numberOfThreads = getNumberOfThreads(context); + mapClass = getMapperClass(context); + if (LOG.isDebugEnabled()) { + LOG.debug("Configuring multithread runner to use " + numberOfThreads + + " threads"); + } + executor = Executors.newFixedThreadPool(numberOfThreads); + for(int i=0; i < numberOfThreads; ++i) { + MapRunner thread = new MapRunner(context); + executor.execute(thread); + } + executor.shutdown(); + while (!executor.isTerminated()) { + // wait till all the threads are done + Thread.sleep(1000); + } + } + + private class SubMapRecordReader + extends RecordReader { + private ImmutableBytesWritable key; + private Result value; + private Configuration conf; + + @Override + public void close() throws IOException { + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override + public void initialize(InputSplit split, + TaskAttemptContext context + ) throws IOException, InterruptedException { + conf = context.getConfiguration(); + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + synchronized (outer) { + if (!outer.nextKeyValue()) { + return false; + } + key = ReflectionUtils.copy(outer.getConfiguration(), + outer.getCurrentKey(), key); + value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value); + return true; + } + } + + public ImmutableBytesWritable getCurrentKey() { + return key; + } + + @Override + public Result getCurrentValue() { + return value; + } + } + + private class SubMapRecordWriter extends RecordWriter { + + @Override + public void close(TaskAttemptContext context) throws IOException, + InterruptedException { + } + + @Override + public void write(K2 key, V2 value) throws IOException, + InterruptedException { + synchronized (outer) { + outer.write(key, value); + } + } + } + + private class SubMapStatusReporter extends StatusReporter { + + @Override + public Counter getCounter(Enum name) { + return outer.getCounter(name); + } + + @Override + public Counter getCounter(String group, String name) { + return outer.getCounter(group, name); + } + + @Override + public void progress() { + outer.progress(); + } + + @Override + public void setStatus(String status) { + outer.setStatus(status); + } + } + + private class MapRunner implements Runnable { + private Mapper mapper; + private Context subcontext; + private Throwable throwable; + + MapRunner(Context context) throws IOException, InterruptedException { + mapper = ReflectionUtils.newInstance(mapClass, + context.getConfiguration()); + subcontext = new Context(outer.getConfiguration(), + outer.getTaskAttemptID(), + new SubMapRecordReader(), + new SubMapRecordWriter(), + context.getOutputCommitter(), + new SubMapStatusReporter(), + outer.getInputSplit()); + } + + @Override + public void run() { + try { + mapper.run(subcontext); + } catch (Throwable ie) { + throwable = ie; + } + } + } +} \ No newline at end of file Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestMulitthreadedTableMapper.java (revision 0) @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.fail; +import static org.junit.Assert.assertTrue; + +/** + * Test Map/Reduce job over HBase tables. The map/reduce process we're testing + * on our tables is simple - take every row in the table, reverse the value of + * a particular cell, and write it back to the table. + */ +@Category(LargeTests.class) +public class TestMulitthreadedTableMapper { + private static final Log LOG = LogFactory.getLog(TestMulitthreadedTableMapper.class); + private static final HBaseTestingUtility UTIL = + new HBaseTestingUtility(); + static final String MULTI_REGION_TABLE_NAME = "mrtest"; + static final byte[] INPUT_FAMILY = Bytes.toBytes("contents"); + static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text"); + static final int NUMBER_OF_THREADS = 10; + + @BeforeClass + public static void beforeClass() throws Exception { + HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME); + desc.addFamily(new HColumnDescriptor(INPUT_FAMILY)); + desc.addFamily(new HColumnDescriptor(OUTPUT_FAMILY)); + UTIL.startMiniCluster(); + HBaseAdmin admin = new HBaseAdmin(UTIL.getConfiguration()); + admin.createTable(desc, HBaseTestingUtility.KEYS); + UTIL.startMiniMapReduceCluster(); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL.shutdownMiniMapReduceCluster(); + UTIL.shutdownMiniCluster(); + } + + /** + * Pass the given key and processed record reduce + */ + public static class ProcessContentsMapper + extends TableMapper { + + /** + * Pass the key, and reversed value to reduce + * + * @param key + * @param value + * @param context + * @throws IOException + */ + public void map(ImmutableBytesWritable key, Result value, + Context context) + throws IOException, InterruptedException { + if (value.size() != 1) { + throw new IOException("There should only be one input column"); + } + Map>> + cf = value.getMap(); + if(!cf.containsKey(INPUT_FAMILY)) { + throw new IOException("Wrong input columns. Missing: '" + + Bytes.toString(INPUT_FAMILY) + "'."); + } + // Get the original value and reverse it + String originalValue = new String(value.getValue(INPUT_FAMILY, null), + HConstants.UTF8_ENCODING); + StringBuilder newValue = new StringBuilder(originalValue); + newValue.reverse(); + // Now set the value to be collected + Put outval = new Put(key.get()); + outval.add(OUTPUT_FAMILY, null, Bytes.toBytes(newValue.toString())); + context.write(key, outval); + } + } + + /** + * Test multithreadedTableMappper map/reduce against a multi-region table + * @throws IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + @Test + public void testMultithreadedTableMapper() + throws IOException, InterruptedException, ClassNotFoundException { + runTestOnTable(new HTable(new Configuration(UTIL.getConfiguration()), + MULTI_REGION_TABLE_NAME)); + } + + private void runTestOnTable(HTable table) + throws IOException, InterruptedException, ClassNotFoundException { + Job job = null; + try { + LOG.info("Before map/reduce startup"); + job = new Job(table.getConfiguration(), "process column contents"); + job.setNumReduceTasks(1); + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + TableMapReduceUtil.initTableMapperJob( + Bytes.toString(table.getTableName()), scan, + MultithreadedTableMapper.class, ImmutableBytesWritable.class, + Put.class, job); + MultithreadedTableMapper.setMapperClass(job, ProcessContentsMapper.class); + MultithreadedTableMapper.setNumberOfThreads(job, NUMBER_OF_THREADS); + TableMapReduceUtil.initTableReducerJob( + Bytes.toString(table.getTableName()), + IdentityTableReducer.class, job); + FileOutputFormat.setOutputPath(job, new Path("test")); + LOG.info("Started " + Bytes.toString(table.getTableName())); + job.waitForCompletion(true); + LOG.info("After map/reduce completion"); + // verify map-reduce results + verify(Bytes.toString(table.getTableName())); + } finally { + table.close(); + if (job != null) { + FileUtil.fullyDelete( + new File(job.getConfiguration().get("hadoop.tmp.dir"))); + } + } + } + + private void verify(String tableName) throws IOException { + HTable table = new HTable(new Configuration(UTIL.getConfiguration()), tableName); + boolean verified = false; + long pause = UTIL.getConfiguration().getLong("hbase.client.pause", 5 * 1000); + int numRetries = UTIL.getConfiguration().getInt("hbase.client.retries.number", 5); + for (int i = 0; i < numRetries; i++) { + try { + LOG.info("Verification attempt #" + i); + verifyAttempt(table); + verified = true; + break; + } catch (NullPointerException e) { + // If here, a cell was empty. Presume its because updates came in + // after the scanner had been opened. Wait a while and retry. + LOG.debug("Verification attempt failed: " + e.getMessage()); + } + try { + Thread.sleep(pause); + } catch (InterruptedException e) { + // continue + } + } + assertTrue(verified); + table.close(); + } + + /** + * Looks at every value of the mapreduce output and verifies that indeed + * the values have been reversed. + * + * @param table Table to scan. + * @throws IOException + * @throws NullPointerException if we failed to find a cell value + */ + private void verifyAttempt(final HTable table) + throws IOException, NullPointerException { + Scan scan = new Scan(); + scan.addFamily(INPUT_FAMILY); + scan.addFamily(OUTPUT_FAMILY); + ResultScanner scanner = table.getScanner(scan); + try { + for (Result r : scanner) { + if (LOG.isDebugEnabled()) { + if (r.size() > 2 ) { + throw new IOException("Too many results, expected 2 got " + + r.size()); + } + } + byte[] firstValue = null; + byte[] secondValue = null; + int count = 0; + for(KeyValue kv : r.list()) { + if (count == 0) { + firstValue = kv.getValue(); + }else if (count == 1) { + secondValue = kv.getValue(); + }else if (count == 2) { + break; + } + count++; + } + String first = ""; + if (firstValue == null) { + throw new NullPointerException(Bytes.toString(r.getRow()) + + ": first value is null"); + } + first = new String(firstValue, HConstants.UTF8_ENCODING); + String second = ""; + if (secondValue == null) { + throw new NullPointerException(Bytes.toString(r.getRow()) + + ": second value is null"); + } + byte[] secondReversed = new byte[secondValue.length]; + for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) { + secondReversed[i] = secondValue[j]; + } + second = new String(secondReversed, HConstants.UTF8_ENCODING); + if (first.compareTo(second) != 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("second key is not the reverse of first. row=" + + Bytes.toStringBinary(r.getRow()) + ", first value=" + first + + ", second value=" + second); + } + fail(); + } + } + } finally { + scanner.close(); + } + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +} +