Index: src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (revision 1330051) +++ src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (working copy) @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.io.hfile.ChecksumUtil; import org.apache.hadoop.hbase.io.hfile.Compression; import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; +import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -87,6 +88,7 @@ import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.TaskLog; import org.apache.zookeeper.KeeperException; @@ -1330,8 +1332,11 @@ // Allow the user to override FS URI for this map-reduce cluster to use. mrCluster = new MiniMRCluster(servers, FS_URI != null ? FS_URI : FileSystem.get(conf).getUri().toString(), 1); - mrCluster.getJobTrackerRunner().getJobTracker().getConf().set("mapred.local.dir", - conf.get("mapred.local.dir")); //Hadoop MiniMR overwrites this while it should not + JobConf jobConf = MapreduceTestingShim.getJobConf(mrCluster); + if (jobConf != null) { + jobConf.set("mapred.local.dir", + conf.get("mapred.local.dir")); //Hadoop MiniMR overwrites this while it should not + } LOG.info("Mini mapreduce cluster started"); conf.set("mapred.job.tracker", mrCluster.createJobConf().get("mapred.job.tracker")); Index: src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java (revision 1330051) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java (working copy) @@ -18,8 +18,8 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.util.List; @@ -40,9 +40,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.MapReduceTestUtil; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -144,13 +142,13 @@ jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts); // only 1st file is considered, and only its 1st entry is used - List splits = input.getSplits(new JobContext(jobConf, new JobID())); + List splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); assertEquals(1, splits.size()); testSplit(splits.get(0), Bytes.toBytes("1")); jobConf.setLong(HLogInputFormat.START_TIME_KEY, ts+1); jobConf.setLong(HLogInputFormat.END_TIME_KEY, ts1+1); - splits = input.getSplits(new JobContext(jobConf, new JobID())); + splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); // both files need to be considered assertEquals(2, splits.size()); // only the 2nd entry from the 1st file is used @@ -191,7 +189,7 @@ jobConf.set("mapred.input.dir", logDir.toString()); // make sure both logs are found - List splits = input.getSplits(new JobContext(jobConf, new JobID())); + List splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); assertEquals(2, splits.size()); // should return exactly one KV @@ -203,14 +201,14 @@ // set an endtime, the 2nd log file can be ignored completely. jobConf.setLong(HLogInputFormat.END_TIME_KEY, secondTs-1); - splits = input.getSplits(new JobContext(jobConf, new JobID())); + splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); assertEquals(1, splits.size()); testSplit(splits.get(0), Bytes.toBytes("1")); // now set a start time jobConf.setLong(HLogInputFormat.END_TIME_KEY, Long.MAX_VALUE); jobConf.setLong(HLogInputFormat.START_TIME_KEY, thirdTs); - splits = input.getSplits(new JobContext(jobConf, new JobID())); + splits = input.getSplits(MapreduceTestingShim.createJobContext(jobConf)); // both logs need to be considered assertEquals(2, splits.size()); // but both readers skip all edits @@ -223,7 +221,7 @@ */ private void testSplit(InputSplit split, byte[]... columns) throws Exception { HLogRecordReader reader = new HLogRecordReader(); - reader.initialize(split, new TaskAttemptContext(conf, new TaskAttemptID())); + reader.initialize(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); for (byte[] column : columns) { assertTrue(reader.nextKeyValue()); Index: src/test/java/org/apache/hadoop/hbase/mapreduce/MapreduceTestingShim.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/mapreduce/MapreduceTestingShim.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/mapreduce/MapreduceTestingShim.java (revision 0) @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MiniMRCluster; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; + +/** + * This class provides shims for HBase to interact with the Hadoop 1.0.x and the + * Hadoop 0.23.x series. + * + * NOTE: No testing done against 0.22.x, or 0.21.x. + */ +abstract public class MapreduceTestingShim { + private static MapreduceTestingShim instance; + private static Class[] emptyParam = new Class[] {}; + + static { + try { + // This class exists in hadoop 0.22+ but not in Hadoop 20.x/1.x + Class c = Class + .forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); + instance = new MapreduceV2Shim(); + } catch (Exception e) { + instance = new MapreduceV1Shim(); + } + } + + abstract public JobContext newJobContext(Configuration jobConf) + throws IOException; + + abstract public JobConf obtainJobConf(MiniMRCluster cluster); + + public static JobContext createJobContext(Configuration jobConf) + throws IOException { + return instance.newJobContext(jobConf); + } + + public static JobConf getJobConf(MiniMRCluster cluster) { + return instance.obtainJobConf(cluster); + } + + private static class MapreduceV1Shim extends MapreduceTestingShim { + public JobContext newJobContext(Configuration jobConf) throws IOException { + // Implementing: + // return new JobContext(jobConf, new JobID()); + JobID jobId = new JobID(); + Constructor c; + try { + c = JobContext.class.getConstructor(Configuration.class, JobID.class); + return c.newInstance(jobConf, jobId); + } catch (Exception e) { + throw new IllegalStateException( + "Failed to instantiate new JobContext(jobConf, new JobID())", e); + } + } + + public JobConf obtainJobConf(MiniMRCluster cluster) { + if (cluster == null) return null; + try { + Object runner = cluster.getJobTrackerRunner(); + Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam); + Object tracker = meth.invoke(runner, new Object []{}); + Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam); + return (JobConf) m.invoke(tracker, new Object []{}); + } catch (NoSuchMethodException nsme) { + return null; + } catch (InvocationTargetException ite) { + return null; + } catch (IllegalAccessException iae) { + return null; + } + } + }; + + private static class MapreduceV2Shim extends MapreduceTestingShim { + public JobContext newJobContext(Configuration jobConf) { + // Implementing: + // return Job.getInstance(jobConf); + try { + Method m = Job.class.getMethod("getInstance", Configuration.class); + return (JobContext) m.invoke(null, jobConf); // static method, then arg + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalStateException( + "Failed to return from Job.getInstance(jobConf)"); + } + } + + public JobConf obtainJobConf(MiniMRCluster cluster) { + return null; + } + }; + +}