Index: shims/src/20S/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java =================================================================== --- shims/src/20S/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java (revision 0) +++ shims/src/20S/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java (revision 0) @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.shims; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +public class HCatHadoopShims20S implements HCatHadoopShims { + + @Override + public TaskAttemptContext createTaskAttemptContext(Configuration conf, + TaskAttemptID taskId) { + return new TaskAttemptContext(conf, taskId); + } + + @Override + public JobContext createJobContext(Configuration conf, + JobID jobId) { + return new JobContext(conf, jobId); + } +} Index: shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java =================================================================== --- shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java (revision 0) +++ shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java (revision 0) @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.shims; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +public class HCatHadoopShims23 implements HCatHadoopShims { + + @Override + public TaskAttemptContext createTaskAttemptContext(Configuration conf, + TaskAttemptID taskId) { + return new TaskAttemptContextImpl(conf, taskId); + } + + @Override + public JobContext createJobContext(Configuration conf, + JobID jobId) { + JobContext newContext = new JobContextImpl(conf, jobId); + return newContext; + } + +} Index: build-common.xml =================================================================== --- build-common.xml (revision 1230859) +++ build-common.xml (working copy) @@ -11,7 +11,23 @@ + + + + + + + + + + + + + + + + @@ -22,8 +38,14 @@ - + + + + + + + + @@ -35,5 +57,4 @@ - Index: src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java =================================================================== --- src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java (revision 1230859) +++ src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java (working copy) @@ -45,6 +45,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hcatalog.rcfile.RCFileMapReduceInputFormat; +import org.apache.hcatalog.shims.HCatHadoopShims; /** * TestRCFile. @@ -232,7 +233,7 @@ assertEquals("splits length should be " + splitNumber, splits.size(), splitNumber); int readCount = 0; for (int i = 0; i < splits.size(); i++) { - TaskAttemptContext tac = new TaskAttemptContext(jonconf, new TaskAttemptID()); + TaskAttemptContext tac = HCatHadoopShims.Instance.get().createTaskAttemptContext(jonconf, new TaskAttemptID()); RecordReader rr = inputFormat.createRecordReader(splits.get(i), tac); rr.initialize(splits.get(i), tac); while (rr.nextKeyValue()) { Index: src/test/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java =================================================================== --- src/test/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java (revision 1230859) +++ src/test/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java (working copy) @@ -48,12 +48,14 @@ import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.rcfile.RCFileInputDriver; +import org.apache.hcatalog.shims.HCatHadoopShims; public class TestRCFileInputStorageDriver extends TestCase{ private static final Configuration conf = new Configuration(); private static final Path dir = new Path(System.getProperty("test.data.dir", ".") + "/mapred"); private static final Path file = new Path(dir, "test_rcfile"); + private final HCatHadoopShims shim = HCatHadoopShims.Instance.get(); // Generate sample records to compare against private byte[][][] getRecords() throws UnsupportedEncodingException { @@ -99,7 +101,7 @@ HCatSchema schema = buildHiveSchema(); RCFileInputDriver sd = new RCFileInputDriver(); - JobContext jc = new JobContext(conf, new JobID()); + JobContext jc = shim.createJobContext(conf, new JobID()); sd.setInputPath(jc, file.toString()); InputFormat iF = sd.getInputFormat(null); InputSplit split = iF.getSplits(jc).get(0); @@ -107,7 +109,7 @@ sd.setOutputSchema(jc, schema); sd.initialize(jc, getProps()); - TaskAttemptContext tac = new TaskAttemptContext(conf, new TaskAttemptID()); + TaskAttemptContext tac = shim.createTaskAttemptContext(conf, new TaskAttemptID()); RecordReader rr = iF.createRecordReader(split,tac); rr.initialize(split, tac); HCatRecord[] tuples = getExpectedRecords(); @@ -125,7 +127,7 @@ BytesRefArrayWritable[] bytesArr = initTestEnvironment(); RCFileInputDriver sd = new RCFileInputDriver(); - JobContext jc = new JobContext(conf, new JobID()); + JobContext jc = shim.createJobContext(conf, new JobID()); sd.setInputPath(jc, file.toString()); InputFormat iF = sd.getInputFormat(null); InputSplit split = iF.getSplits(jc).get(0); @@ -134,7 +136,7 @@ sd.initialize(jc, getProps()); conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,jc.getConfiguration().get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); - TaskAttemptContext tac = new TaskAttemptContext(conf, new TaskAttemptID()); + TaskAttemptContext tac = shim.createTaskAttemptContext(conf, new TaskAttemptID()); RecordReader rr = iF.createRecordReader(split,tac); rr.initialize(split, tac); HCatRecord[] tuples = getPrunedRecords(); @@ -154,7 +156,7 @@ BytesRefArrayWritable[] bytesArr = initTestEnvironment(); RCFileInputDriver sd = new RCFileInputDriver(); - JobContext jc = new JobContext(conf, new JobID()); + JobContext jc = shim.createJobContext(conf, new JobID()); sd.setInputPath(jc, file.toString()); InputFormat iF = sd.getInputFormat(null); InputSplit split = iF.getSplits(jc).get(0); @@ -166,7 +168,7 @@ map.put("part1", "first-part"); sd.setPartitionValues(jc, map); conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,jc.getConfiguration().get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); - TaskAttemptContext tac = new TaskAttemptContext(conf, new TaskAttemptID()); + TaskAttemptContext tac = shim.createTaskAttemptContext(conf, new TaskAttemptID()); RecordReader rr = iF.createRecordReader(split,tac); rr.initialize(split, tac); HCatRecord[] tuples = getReorderedCols(); Index: src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java =================================================================== --- src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java (revision 1230859) +++ src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java (working copy) @@ -30,7 +30,6 @@ import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; -import org.apache.hcatalog.common.ErrorType; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.common.HCatUtil; @@ -39,14 +38,13 @@ import org.apache.hcatalog.mapreduce.HCatInputStorageDriver; import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver; import org.apache.hcatalog.mapreduce.OutputJobInfo; -import org.apache.hcatalog.rcfile.RCFileInputDriver; -import org.apache.hcatalog.rcfile.RCFileOutputDriver; +import org.apache.hcatalog.shims.HCatHadoopShims; public class TestRCFileOutputStorageDriver extends TestCase { public void testConversion() throws IOException { Configuration conf = new Configuration(); - JobContext jc = new JobContext(conf, new JobID()); + JobContext jc = HCatHadoopShims.Instance.get().createJobContext(conf, new JobID()); String jobString = HCatUtil.serialize(OutputJobInfo.create(null,null,null,null,null)); jc.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO,jobString); Index: src/java/org/apache/hcatalog/pig/HCatStorer.java =================================================================== --- src/java/org/apache/hcatalog/pig/HCatStorer.java (revision 1230859) +++ src/java/org/apache/hcatalog/pig/HCatStorer.java (working copy) @@ -23,17 +23,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hcatalog.common.HCatConstants; import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.mapreduce.HCatOutputFormat; -import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver; import org.apache.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hcatalog.shims.HCatHadoopShims; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema; import org.apache.pig.impl.logicalLayer.FrontendException; @@ -150,7 +148,8 @@ //In local mode, mapreduce will not call OutputCommitter.cleanupJob. //Calling it from here so that the partition publish happens. //This call needs to be removed after MAPREDUCE-1447 is fixed. - getOutputFormat().getOutputCommitter(new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID())).cleanupJob(job); + getOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext( + job.getConfiguration(), new TaskAttemptID())).cleanupJob(job); } catch (IOException e) { throw new IOException("Failed to cleanup job",e); } catch (InterruptedException e) { Index: src/java/org/apache/hcatalog/shims/HCatHadoopShims.java =================================================================== --- src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (revision 0) +++ src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (revision 0) @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.shims; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +/** + * Shim layer to abstract differences between Hadoop 0.20 and 0.23 (HCATALOG-179). + * This mirrors Hive shims, but is kept separate for HCatalog dependencies. + **/ +public interface HCatHadoopShims { + + public static abstract class Instance { + static HCatHadoopShims instance = selectShim(); + public static HCatHadoopShims get() { + return instance; + } + + private static HCatHadoopShims selectShim() { + // piggyback on Hive's detection logic + String major = ShimLoader.getMajorVersion(); + String shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims20S"; + if (major.startsWith("0.23")) { + shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims23"; + } + try { + Class clasz = + Class.forName(shimFQN).asSubclass(HCatHadoopShims.class); + return clasz.newInstance(); + } catch (Exception e) { + throw new RuntimeException("Failed to instantiate: " + shimFQN); + } + } + } + + public TaskAttemptContext createTaskAttemptContext(Configuration conf, + TaskAttemptID taskId); + + public JobContext createJobContext(Configuration conf, + JobID jobId); + +} Index: src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java =================================================================== --- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1230859) +++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (working copy) @@ -47,6 +47,7 @@ import org.apache.hcatalog.data.schema.HCatSchema; import org.apache.hcatalog.data.schema.HCatSchemaUtils; import org.apache.hcatalog.har.HarOutputCommitterPostProcessor; +import org.apache.hcatalog.shims.HCatHadoopShims; import org.apache.thrift.TException; import java.io.IOException; @@ -146,7 +147,7 @@ for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){ try { baseOsd.abortOutputCommitterJob( - new TaskAttemptContext( + HCatHadoopShims.Instance.get().createTaskAttemptContext( jobContext.getConfiguration(), TaskAttemptID.forName(ptnRootLocation) ),state); } catch (Exception e) { @@ -256,7 +257,7 @@ for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){ try { baseOsd.cleanupOutputCommitterJob( - new TaskAttemptContext( + HCatHadoopShims.Instance.get().createTaskAttemptContext( context.getConfiguration(), TaskAttemptID.forName(ptnRootLocation) )); } catch (Exception e) { Index: build.xml =================================================================== --- build.xml (revision 1230859) +++ build.xml (working copy) @@ -125,23 +125,17 @@ - - - - - - + + + + + + + + + - + + + + + + + + + + + +