Index: shims/src/20S/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java
===================================================================
--- shims/src/20S/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java (revision 0)
+++ shims/src/20S/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java (revision 0)
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.shims;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+public class HCatHadoopShims20S implements HCatHadoopShims {
+
+ @Override
+ public TaskAttemptContext createTaskAttemptContext(Configuration conf,
+ TaskAttemptID taskId) {
+ return new TaskAttemptContext(conf, taskId);
+ }
+
+ @Override
+ public JobContext createJobContext(Configuration conf,
+ JobID jobId) {
+ return new JobContext(conf, jobId);
+ }
+}
Index: shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java
===================================================================
--- shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java (revision 0)
+++ shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java (revision 0)
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.shims;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+public class HCatHadoopShims23 implements HCatHadoopShims {
+
+ @Override
+ public TaskAttemptContext createTaskAttemptContext(Configuration conf,
+ TaskAttemptID taskId) {
+ return new TaskAttemptContextImpl(conf, taskId);
+ }
+
+ @Override
+ public JobContext createJobContext(Configuration conf,
+ JobID jobId) {
+ JobContext newContext = new JobContextImpl(conf, jobId);
+ return newContext;
+ }
+
+}
Index: build-common.xml
===================================================================
--- build-common.xml (revision 1230859)
+++ build-common.xml (working copy)
@@ -11,7 +11,23 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -22,8 +38,14 @@
-
+
+
+
+
+
+
+
+
@@ -35,5 +57,4 @@
-
Index: src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java
===================================================================
--- src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java (revision 1230859)
+++ src/test/org/apache/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java (working copy)
@@ -45,6 +45,7 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hcatalog.rcfile.RCFileMapReduceInputFormat;
+import org.apache.hcatalog.shims.HCatHadoopShims;
/**
* TestRCFile.
@@ -232,7 +233,7 @@
assertEquals("splits length should be " + splitNumber, splits.size(), splitNumber);
int readCount = 0;
for (int i = 0; i < splits.size(); i++) {
- TaskAttemptContext tac = new TaskAttemptContext(jonconf, new TaskAttemptID());
+ TaskAttemptContext tac = HCatHadoopShims.Instance.get().createTaskAttemptContext(jonconf, new TaskAttemptID());
RecordReader rr = inputFormat.createRecordReader(splits.get(i), tac);
rr.initialize(splits.get(i), tac);
while (rr.nextKeyValue()) {
Index: src/test/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java
===================================================================
--- src/test/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java (revision 1230859)
+++ src/test/org/apache/hcatalog/rcfile/TestRCFileInputStorageDriver.java (working copy)
@@ -48,12 +48,14 @@
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.rcfile.RCFileInputDriver;
+import org.apache.hcatalog.shims.HCatHadoopShims;
public class TestRCFileInputStorageDriver extends TestCase{
private static final Configuration conf = new Configuration();
private static final Path dir = new Path(System.getProperty("test.data.dir", ".") + "/mapred");
private static final Path file = new Path(dir, "test_rcfile");
+ private final HCatHadoopShims shim = HCatHadoopShims.Instance.get();
// Generate sample records to compare against
private byte[][][] getRecords() throws UnsupportedEncodingException {
@@ -99,7 +101,7 @@
HCatSchema schema = buildHiveSchema();
RCFileInputDriver sd = new RCFileInputDriver();
- JobContext jc = new JobContext(conf, new JobID());
+ JobContext jc = shim.createJobContext(conf, new JobID());
sd.setInputPath(jc, file.toString());
InputFormat,?> iF = sd.getInputFormat(null);
InputSplit split = iF.getSplits(jc).get(0);
@@ -107,7 +109,7 @@
sd.setOutputSchema(jc, schema);
sd.initialize(jc, getProps());
- TaskAttemptContext tac = new TaskAttemptContext(conf, new TaskAttemptID());
+ TaskAttemptContext tac = shim.createTaskAttemptContext(conf, new TaskAttemptID());
RecordReader,?> rr = iF.createRecordReader(split,tac);
rr.initialize(split, tac);
HCatRecord[] tuples = getExpectedRecords();
@@ -125,7 +127,7 @@
BytesRefArrayWritable[] bytesArr = initTestEnvironment();
RCFileInputDriver sd = new RCFileInputDriver();
- JobContext jc = new JobContext(conf, new JobID());
+ JobContext jc = shim.createJobContext(conf, new JobID());
sd.setInputPath(jc, file.toString());
InputFormat,?> iF = sd.getInputFormat(null);
InputSplit split = iF.getSplits(jc).get(0);
@@ -134,7 +136,7 @@
sd.initialize(jc, getProps());
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,jc.getConfiguration().get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
- TaskAttemptContext tac = new TaskAttemptContext(conf, new TaskAttemptID());
+ TaskAttemptContext tac = shim.createTaskAttemptContext(conf, new TaskAttemptID());
RecordReader,?> rr = iF.createRecordReader(split,tac);
rr.initialize(split, tac);
HCatRecord[] tuples = getPrunedRecords();
@@ -154,7 +156,7 @@
BytesRefArrayWritable[] bytesArr = initTestEnvironment();
RCFileInputDriver sd = new RCFileInputDriver();
- JobContext jc = new JobContext(conf, new JobID());
+ JobContext jc = shim.createJobContext(conf, new JobID());
sd.setInputPath(jc, file.toString());
InputFormat,?> iF = sd.getInputFormat(null);
InputSplit split = iF.getSplits(jc).get(0);
@@ -166,7 +168,7 @@
map.put("part1", "first-part");
sd.setPartitionValues(jc, map);
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,jc.getConfiguration().get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
- TaskAttemptContext tac = new TaskAttemptContext(conf, new TaskAttemptID());
+ TaskAttemptContext tac = shim.createTaskAttemptContext(conf, new TaskAttemptID());
RecordReader,?> rr = iF.createRecordReader(split,tac);
rr.initialize(split, tac);
HCatRecord[] tuples = getReorderedCols();
Index: src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java
===================================================================
--- src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java (revision 1230859)
+++ src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java (working copy)
@@ -30,7 +30,6 @@
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hcatalog.common.ErrorType;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatException;
import org.apache.hcatalog.common.HCatUtil;
@@ -39,14 +38,13 @@
import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
-import org.apache.hcatalog.rcfile.RCFileInputDriver;
-import org.apache.hcatalog.rcfile.RCFileOutputDriver;
+import org.apache.hcatalog.shims.HCatHadoopShims;
public class TestRCFileOutputStorageDriver extends TestCase {
public void testConversion() throws IOException {
Configuration conf = new Configuration();
- JobContext jc = new JobContext(conf, new JobID());
+ JobContext jc = HCatHadoopShims.Instance.get().createJobContext(conf, new JobID());
String jobString = HCatUtil.serialize(OutputJobInfo.create(null,null,null,null,null));
jc.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO,jobString);
Index: src/java/org/apache/hcatalog/pig/HCatStorer.java
===================================================================
--- src/java/org/apache/hcatalog/pig/HCatStorer.java (revision 1230859)
+++ src/java/org/apache/hcatalog/pig/HCatStorer.java (working copy)
@@ -23,17 +23,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatException;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.shims.HCatHadoopShims;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -150,7 +148,8 @@
//In local mode, mapreduce will not call OutputCommitter.cleanupJob.
//Calling it from here so that the partition publish happens.
//This call needs to be removed after MAPREDUCE-1447 is fixed.
- getOutputFormat().getOutputCommitter(new TaskAttemptContext(job.getConfiguration(), new TaskAttemptID())).cleanupJob(job);
+ getOutputFormat().getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext(
+ job.getConfiguration(), new TaskAttemptID())).cleanupJob(job);
} catch (IOException e) {
throw new IOException("Failed to cleanup job",e);
} catch (InterruptedException e) {
Index: src/java/org/apache/hcatalog/shims/HCatHadoopShims.java
===================================================================
--- src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (revision 0)
+++ src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (revision 0)
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.shims;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * Shim layer to abstract differences between Hadoop 0.20 and 0.23 (HCATALOG-179).
+ * This mirrors Hive shims, but is kept separate for HCatalog dependencies.
+ **/
+public interface HCatHadoopShims {
+
+ public static abstract class Instance {
+ static HCatHadoopShims instance = selectShim();
+ public static HCatHadoopShims get() {
+ return instance;
+ }
+
+ private static HCatHadoopShims selectShim() {
+ // piggyback on Hive's detection logic
+ String major = ShimLoader.getMajorVersion();
+ String shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims20S";
+ if (major.startsWith("0.23")) {
+ shimFQN = "org.apache.hcatalog.shims.HCatHadoopShims23";
+ }
+ try {
+ Class extends HCatHadoopShims> clasz =
+ Class.forName(shimFQN).asSubclass(HCatHadoopShims.class);
+ return clasz.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to instantiate: " + shimFQN);
+ }
+ }
+ }
+
+ public TaskAttemptContext createTaskAttemptContext(Configuration conf,
+ TaskAttemptID taskId);
+
+ public JobContext createJobContext(Configuration conf,
+ JobID jobId);
+
+}
Index: src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
===================================================================
--- src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (revision 1230859)
+++ src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (working copy)
@@ -47,6 +47,7 @@
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.data.schema.HCatSchemaUtils;
import org.apache.hcatalog.har.HarOutputCommitterPostProcessor;
+import org.apache.hcatalog.shims.HCatHadoopShims;
import org.apache.thrift.TException;
import java.io.IOException;
@@ -146,7 +147,7 @@
for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){
try {
baseOsd.abortOutputCommitterJob(
- new TaskAttemptContext(
+ HCatHadoopShims.Instance.get().createTaskAttemptContext(
jobContext.getConfiguration(), TaskAttemptID.forName(ptnRootLocation)
),state);
} catch (Exception e) {
@@ -256,7 +257,7 @@
for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){
try {
baseOsd.cleanupOutputCommitterJob(
- new TaskAttemptContext(
+ HCatHadoopShims.Instance.get().createTaskAttemptContext(
context.getConfiguration(), TaskAttemptID.forName(ptnRootLocation)
));
} catch (Exception e) {
Index: build.xml
===================================================================
--- build.xml (revision 1230859)
+++ build.xml (working copy)
@@ -125,23 +125,17 @@
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+