diff --git src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java index eb96ac6..7126c4e 100644 --- src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java +++ src/main/java/org/apache/hadoop/hbase/mapreduce/MultithreadedTableMapper.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; import java.lang.reflect.Constructor; +import java.lang.reflect.Method; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -31,11 +32,14 @@ import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.MapContext; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.StatusReporter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.util.ReflectionUtils; @@ -238,16 +242,19 @@ public class MultithreadedTableMapper extends TableMapper { mapper = ReflectionUtils.newInstance(mapClass, context.getConfiguration()); try { + // for hadoop 1.0 Constructor c = context.getClass().getConstructor( + Mapper.class, Configuration.class, - outer.getTaskAttemptID().getClass(), - SubMapRecordReader.class, - SubMapRecordWriter.class, - context.getOutputCommitter().getClass(), - SubMapStatusReporter.class, - outer.getInputSplit().getClass()); + TaskAttemptID.class, + RecordReader.class, + RecordWriter.class, + OutputCommitter.class, + StatusReporter.class, + InputSplit.class); c.setAccessible(true); subcontext = (Context) c.newInstance( + mapper, outer.getConfiguration(), outer.getTaskAttemptID(), new SubMapRecordReader(), @@ -256,8 +263,32 @@ public class MultithreadedTableMapper extends TableMapper { new SubMapStatusReporter(), outer.getInputSplit()); } catch (Exception e) { - // rethrow as IOE - throw new IOException(e); + try { + // for hadoop 0.23 + Constructor c = Class.forName("org.apache.hadoop.mapreduce.task.MapContextImpl").getConstructor( + Configuration.class, + TaskAttemptID.class, + RecordReader.class, + RecordWriter.class, + OutputCommitter.class, + StatusReporter.class, + InputSplit.class); + c.setAccessible(true); + MapContext mc = (MapContext) c.newInstance( + outer.getConfiguration(), + outer.getTaskAttemptID(), + new SubMapRecordReader(), + new SubMapRecordWriter(), + context.getOutputCommitter(), + new SubMapStatusReporter(), + outer.getInputSplit()); + Class wrappedMapperClass = Class.forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper"); + Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class); + subcontext = (Context) getMapContext.invoke(wrappedMapperClass.newInstance(), mc); + } catch (Exception ee) { + // rethrow as IOE + throw new IOException(e); + } } }