diff --git hcatalog/build-support/ant/deploy.xml hcatalog/build-support/ant/deploy.xml
index b6f9534..3404154 100644
--- hcatalog/build-support/ant/deploy.xml
+++ hcatalog/build-support/ant/deploy.xml
@@ -69,7 +69,7 @@
<_mvnpublish module="testutils" />
-
+
-
@@ -200,7 +199,6 @@
-
-
-
-
-
-
-
-
-
-
-
-
- <_javac srcDir="${basedir}/src/${hadoopversion}/java"
- destDir="${path.to.basedir}/core/build/classes"
- classPathRef="compile.class.path"/>
-
-
diff --git hcatalog/shims/src/20/java/org/apache/hadoop/mapred/TempletonJobTracker.java hcatalog/shims/src/20/java/org/apache/hadoop/mapred/TempletonJobTracker.java
deleted file mode 100644
index 97a8e17..0000000
--- hcatalog/shims/src/20/java/org/apache/hadoop/mapred/TempletonJobTracker.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hcatalog.shims.HCatHadoopShims;
-
-/*
- * Communicate with the JobTracker as a specific user.
- */
-public class TempletonJobTracker {
- private JobSubmissionProtocol cnx;
-
- /**
- * Create a connection to the Job Tracker.
- */
- public TempletonJobTracker(Configuration conf)
- throws IOException {
- UserGroupInformation ugi = UserGroupInformation.getLoginUser();
- cnx = (JobSubmissionProtocol)
- RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID,
- HCatHadoopShims.Instance.get().getAddress(conf),
- ugi,
- conf,
- NetUtils.getSocketFactory(conf,
- JobSubmissionProtocol.class));
- }
-
- /**
- * Grab a handle to a job that is already known to the JobTracker.
- *
- * @return Profile of the job, or null if not found.
- */
- public JobProfile getJobProfile(JobID jobid)
- throws IOException {
- return cnx.getJobProfile(jobid);
- }
-
- /**
- * Grab a handle to a job that is already known to the JobTracker.
- *
- * @return Status of the job, or null if not found.
- */
- public JobStatus getJobStatus(JobID jobid)
- throws IOException {
- return cnx.getJobStatus(jobid);
- }
-
-
- /**
- * Kill a job.
- */
- public void killJob(JobID jobid)
- throws IOException {
- cnx.killJob(jobid);
- }
-
- /**
- * Get all the jobs submitted.
- */
- public JobStatus[] getAllJobs()
- throws IOException {
- return cnx.getAllJobs();
- }
-
- /**
- * Close the connection to the Job Tracker.
- */
- public void close() {
- RPC.stopProxy(cnx);
- }
-}
diff --git hcatalog/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java hcatalog/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java
deleted file mode 100644
index 7ab73c1..0000000
--- hcatalog/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hcatalog.shims;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobTracker;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus.State;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.util.Progressable;
-
-public class HCatHadoopShims20S implements HCatHadoopShims {
- @Override
- public TaskID createTaskID() {
- return new TaskID();
- }
-
- @Override
- public TaskAttemptID createTaskAttemptID() {
- return new TaskAttemptID();
- }
-
- @Override
- public TaskAttemptContext createTaskAttemptContext(Configuration conf, TaskAttemptID taskId) {
- return new TaskAttemptContext(conf, taskId);
- }
-
- @Override
- public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapred.JobConf conf,
- org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable) {
- org.apache.hadoop.mapred.TaskAttemptContext newContext = null;
- try {
- java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.TaskAttemptContext.class.getDeclaredConstructor(
- org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapred.TaskAttemptID.class,
- Progressable.class);
- construct.setAccessible(true);
- newContext = (org.apache.hadoop.mapred.TaskAttemptContext)construct.newInstance(conf, taskId, progressable);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return newContext;
- }
-
- @Override
- public JobContext createJobContext(Configuration conf,
- JobID jobId) {
- return new JobContext(conf, jobId);
- }
-
- @Override
- public org.apache.hadoop.mapred.JobContext createJobContext(org.apache.hadoop.mapred.JobConf conf,
- org.apache.hadoop.mapreduce.JobID jobId, Progressable progressable) {
- org.apache.hadoop.mapred.JobContext newContext = null;
- try {
- java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.JobContext.class.getDeclaredConstructor(
- org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapreduce.JobID.class,
- Progressable.class);
- construct.setAccessible(true);
- newContext = (org.apache.hadoop.mapred.JobContext)construct.newInstance(conf, jobId, progressable);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return newContext;
- }
-
- @Override
- public void commitJob(OutputFormat outputFormat, Job job) throws IOException {
- if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) {
- try {
- //In local mode, mapreduce will not call OutputCommitter.cleanupJob.
- //Calling it from here so that the partition publish happens.
- //This call needs to be removed after MAPREDUCE-1447 is fixed.
- outputFormat.getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext(
- job.getConfiguration(), HCatHadoopShims.Instance.get().createTaskAttemptID())).commitJob(job);
- } catch (IOException e) {
- throw new IOException("Failed to cleanup job",e);
- } catch (InterruptedException e) {
- throw new IOException("Failed to cleanup job",e);
- }
- }
- }
-
- @Override
- public void abortJob(OutputFormat outputFormat, Job job) throws IOException {
- if (job.getConfiguration().get("mapred.job.tracker", "")
- .equalsIgnoreCase("local")) {
- try {
- // This call needs to be removed after MAPREDUCE-1447 is fixed.
- outputFormat.getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext(
- job.getConfiguration(), new TaskAttemptID())).abortJob(job, State.FAILED);
- } catch (IOException e) {
- throw new IOException("Failed to abort job", e);
- } catch (InterruptedException e) {
- throw new IOException("Failed to abort job", e);
- }
- }
- }
-
- @Override
- public InetSocketAddress getResourceManagerAddress(Configuration conf)
- {
- return JobTracker.getAddress(conf);
- }
-
- @Override
- public String getPropertyName(PropertyName name) {
- switch (name) {
- case CACHE_ARCHIVES:
- return DistributedCache.CACHE_ARCHIVES;
- case CACHE_FILES:
- return DistributedCache.CACHE_FILES;
- case CACHE_SYMLINK:
- return DistributedCache.CACHE_SYMLINK;
- }
-
- return "";
- }
-
- @Override
- public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException {
- // In hadoop 1.x.x the file system URI is sufficient to determine the uri of the file
- return "hdfs".equals(fs.getUri().getScheme());
- }
-
- @Override
- public InetSocketAddress getAddress(Configuration conf) {
- String jobTrackerStr =
- conf.get("mapred.job.tracker", "localhost:8012");
- return NetUtils.createSocketAddr(jobTrackerStr);
- }
-}
diff --git hcatalog/shims/src/23/java/org/apache/hadoop/mapred/TempletonJobTracker.java hcatalog/shims/src/23/java/org/apache/hadoop/mapred/TempletonJobTracker.java
deleted file mode 100644
index 2ad8605..0000000
--- hcatalog/shims/src/23/java/org/apache/hadoop/mapred/TempletonJobTracker.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-
-/*
- * Communicate with the JobTracker as a specific user.
- */
-public class TempletonJobTracker {
- private JobClient jc;
-
- /**
- * Create a connection to the Job Tracker.
- */
- public TempletonJobTracker(Configuration conf)
- throws IOException {
-
- jc = new JobClient(conf);
- }
-
- /**
- * Grab a handle to a job that is already known to the JobTracker.
- *
- * @return Profile of the job, or null if not found.
- */
- public JobProfile getJobProfile(JobID jobid)
- throws IOException {
- RunningJob rj = jc.getJob(jobid);
- JobStatus jobStatus = rj.getJobStatus();
- JobProfile jobProfile = new JobProfile(jobStatus.getUsername(), jobStatus.getJobID(),
- jobStatus.getJobFile(), jobStatus.getTrackingUrl(), jobStatus.getJobName());
- return jobProfile;
- }
-
- /**
- * Grab a handle to a job that is already known to the JobTracker.
- *
- * @return Status of the job, or null if not found.
- */
- public JobStatus getJobStatus(JobID jobid)
- throws IOException {
- RunningJob rj = jc.getJob(jobid);
- JobStatus jobStatus = rj.getJobStatus();
- return jobStatus;
- }
-
-
- /**
- * Kill a job.
- */
- public void killJob(JobID jobid)
- throws IOException {
- RunningJob rj = jc.getJob(jobid);
- rj.killJob();
- }
-
- /**
- * Get all the jobs submitted.
- */
- public JobStatus[] getAllJobs()
- throws IOException {
- return jc.getAllJobs();
- }
-
- /**
- * Close the connection to the Job Tracker.
- */
- public void close() {
- try {
- jc.close();
- } catch (IOException e) {
- }
- }
-}
diff --git hcatalog/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java hcatalog/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java
deleted file mode 100644
index 764c0c3..0000000
--- hcatalog/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hcatalog.shims;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-
-
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.net.NetUtils;
-
-public class HCatHadoopShims23 implements HCatHadoopShims {
- @Override
- public TaskID createTaskID() {
- return new TaskID("", 0, TaskType.MAP, 0);
- }
-
- @Override
- public TaskAttemptID createTaskAttemptID() {
- return new TaskAttemptID("", 0, TaskType.MAP, 0, 0);
- }
-
- @Override
- public org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext(Configuration conf,
- org.apache.hadoop.mapreduce.TaskAttemptID taskId) {
- return new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(
- conf instanceof JobConf? new JobConf(conf) : conf,
- taskId);
- }
-
- @Override
- public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapred.JobConf conf,
- org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable) {
- org.apache.hadoop.mapred.TaskAttemptContext newContext = null;
- try {
- java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.TaskAttemptContextImpl.class.getDeclaredConstructor(
- org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapred.TaskAttemptID.class,
- Reporter.class);
- construct.setAccessible(true);
- newContext = (org.apache.hadoop.mapred.TaskAttemptContext) construct.newInstance(
- new JobConf(conf), taskId, (Reporter) progressable);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return newContext;
- }
-
- @Override
- public JobContext createJobContext(Configuration conf,
- JobID jobId) {
- return new JobContextImpl(conf instanceof JobConf? new JobConf(conf) : conf,
- jobId);
- }
-
- @Override
- public org.apache.hadoop.mapred.JobContext createJobContext(org.apache.hadoop.mapred.JobConf conf,
- org.apache.hadoop.mapreduce.JobID jobId, Progressable progressable) {
- return new org.apache.hadoop.mapred.JobContextImpl(
- new JobConf(conf), jobId, (org.apache.hadoop.mapred.Reporter) progressable);
- }
-
- @Override
- public void commitJob(OutputFormat outputFormat, Job job) throws IOException {
- // Do nothing as this was fixed by MAPREDUCE-1447.
- }
-
- @Override
- public void abortJob(OutputFormat outputFormat, Job job) throws IOException {
- // Do nothing as this was fixed by MAPREDUCE-1447.
- }
-
- @Override
- public InetSocketAddress getResourceManagerAddress(Configuration conf) {
- String addr = conf.get("yarn.resourcemanager.address", "localhost:8032");
-
- return NetUtils.createSocketAddr(addr);
- }
-
- @Override
- public String getPropertyName(PropertyName name) {
- switch (name) {
- case CACHE_ARCHIVES:
- return MRJobConfig.CACHE_ARCHIVES;
- case CACHE_FILES:
- return MRJobConfig.CACHE_FILES;
- case CACHE_SYMLINK:
- return MRJobConfig.CACHE_SYMLINK;
- }
-
- return "";
- }
-
- @Override
- public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException {
- // In case of viewfs we need to lookup where the actual file is to know the filesystem in use.
- // resolvePath is a sure shot way of knowing which file system the file is.
- return "hdfs".equals(fs.resolvePath(path).toUri().getScheme());
- }
-
- @Override
- public InetSocketAddress getAddress(Configuration conf) {
- return null;
- }
-}
diff --git hcatalog/webhcat/svr/pom.xml hcatalog/webhcat/svr/pom.xml
index edbca90..d34724e 100644
--- hcatalog/webhcat/svr/pom.xml
+++ hcatalog/webhcat/svr/pom.xml
@@ -36,7 +36,18 @@
http://maven.apache.org
-
+
@@ -74,12 +85,6 @@
compile
- com.sun.jersey
- jersey-json
- ${jersey.version}
- compile
-
-
org.codehaus.jackson
jackson-core-asl
${jackson.version}
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java
index af7b2bd..6bff4e9 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java
@@ -19,8 +19,10 @@
package org.apache.hcatalog.templeton;
import java.io.IOException;
+
+import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TempletonJobTracker;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hcatalog.templeton.tool.JobState;
@@ -36,10 +38,10 @@ public QueueStatusBean run(String user, String id)
throws NotAuthorizedException, BadParam, IOException, InterruptedException
{
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
- TempletonJobTracker tracker = null;
+ WebHCatJTShim tracker = null;
JobState state = null;
try {
- tracker = new TempletonJobTracker(appConf);
+ tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf);
JobID jobid = StatusDelegator.StringToJobID(id);
if (jobid == null)
throw new BadParam("Invalid jobid: " + id);
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java
index c963953..a4befc9 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java
@@ -22,8 +22,9 @@
import java.util.List;
import java.util.ArrayList;
+import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.TempletonJobTracker;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hcatalog.templeton.tool.JobState;
@@ -39,9 +40,9 @@ public ListDelegator(AppConfig appConf) {
throws NotAuthorizedException, BadParam, IOException, InterruptedException {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
- TempletonJobTracker tracker = null;
+ WebHCatJTShim tracker = null;
try {
- tracker = new TempletonJobTracker(appConf);
+ tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf);
ArrayList ids = new ArrayList();
diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java
index 27f80d3..4183906 100644
--- hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java
+++ hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java
@@ -22,10 +22,11 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobProfile;
import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.TempletonJobTracker;
import org.apache.hcatalog.templeton.tool.JobState;
/**
@@ -41,10 +42,10 @@ public StatusDelegator(AppConfig appConf) {
public QueueStatusBean run(String user, String id)
throws NotAuthorizedException, BadParam, IOException, InterruptedException
{
- TempletonJobTracker tracker = null;
+ WebHCatJTShim tracker = null;
JobState state = null;
try {
- tracker = new TempletonJobTracker(appConf);
+ tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf);
JobID jobid = StatusDelegator.StringToJobID(id);
if (jobid == null)
throw new BadParam("Invalid jobid: " + id);
@@ -60,7 +61,7 @@ public QueueStatusBean run(String user, String id)
}
}
- public static QueueStatusBean makeStatus(TempletonJobTracker tracker,
+ public static QueueStatusBean makeStatus(WebHCatJTShim tracker,
JobID jobid,
String childid,
JobState state)
@@ -87,7 +88,7 @@ public static QueueStatusBean makeStatus(TempletonJobTracker tracker,
return new QueueStatusBean(state, status, profile);
}
- public static QueueStatusBean makeStatus(TempletonJobTracker tracker,
+ public static QueueStatusBean makeStatus(WebHCatJTShim tracker,
JobID jobid,
JobState state)
throws BadParam, IOException {
diff --git shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
index d2bb34d..1d9777f 100644
--- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
+++ shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
@@ -735,5 +735,12 @@ public String getTokenFileLocEnvName() {
throw new UnsupportedOperationException(
"Kerberos not supported in current hadoop version");
}
-
+ @Override
+ public HCatHadoopShims getHCatShim() {
+ throw new UnsupportedOperationException("HCatalog does not support Hadoop 0.20.x");
+ }
+ @Override
+ public WebHCatJTShim getWebHCatShim(Configuration conf) throws IOException {
+ throw new UnsupportedOperationException("WebHCat does not support Hadoop 0.20.x");
+ }
}
diff --git shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
index 3c93393..84d7ef8 100644
--- shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
+++ shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
@@ -18,21 +18,30 @@
package org.apache.hadoop.hive.shims;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Trash;
-import org.apache.hadoop.hive.shims.HadoopShimsSecure;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskLogServlet;
+import org.apache.hadoop.mapred.WebHCatJTShim20S;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
@@ -195,4 +204,129 @@ public void shutdown() {
cluster.shutdown();
}
}
+ private volatile HCatHadoopShims hcatShimInstance;
+ @Override
+ public HCatHadoopShims getHCatShim() {
+ if(hcatShimInstance == null) {
+ hcatShimInstance = new HCatHadoopShims20S();
+ }
+ return hcatShimInstance;
+ }
+ private final class HCatHadoopShims20S implements HCatHadoopShims {
+ @Override
+ public TaskID createTaskID() {
+ return new TaskID();
+ }
+
+ @Override
+ public TaskAttemptID createTaskAttemptID() {
+ return new TaskAttemptID();
+ }
+
+ @Override
+ public TaskAttemptContext createTaskAttemptContext(Configuration conf, TaskAttemptID taskId) {
+ return new TaskAttemptContext(conf, taskId);
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapred.JobConf conf,
+ org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable) {
+ org.apache.hadoop.mapred.TaskAttemptContext newContext = null;
+ try {
+ java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.TaskAttemptContext.class.getDeclaredConstructor(
+ org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapred.TaskAttemptID.class,
+ Progressable.class);
+ construct.setAccessible(true);
+ newContext = (org.apache.hadoop.mapred.TaskAttemptContext)construct.newInstance(conf, taskId, progressable);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return newContext;
+ }
+
+ @Override
+ public JobContext createJobContext(Configuration conf,
+ JobID jobId) {
+ return new JobContext(conf, jobId);
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.JobContext createJobContext(org.apache.hadoop.mapred.JobConf conf,
+ org.apache.hadoop.mapreduce.JobID jobId, Progressable progressable) {
+ org.apache.hadoop.mapred.JobContext newContext = null;
+ try {
+ java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.JobContext.class.getDeclaredConstructor(
+ org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapreduce.JobID.class,
+ Progressable.class);
+ construct.setAccessible(true);
+ newContext = (org.apache.hadoop.mapred.JobContext)construct.newInstance(conf, jobId, progressable);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return newContext;
+ }
+
+ @Override
+ public void commitJob(OutputFormat outputFormat, Job job) throws IOException {
+ if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) {
+ try {
+ //In local mode, mapreduce will not call OutputCommitter.cleanupJob.
+ //Calling it from here so that the partition publish happens.
+ //This call needs to be removed after MAPREDUCE-1447 is fixed.
+ outputFormat.getOutputCommitter(createTaskAttemptContext(
+ job.getConfiguration(), createTaskAttemptID())).commitJob(job);
+ } catch (IOException e) {
+ throw new IOException("Failed to cleanup job",e);
+ } catch (InterruptedException e) {
+ throw new IOException("Failed to cleanup job",e);
+ }
+ }
+ }
+
+ @Override
+ public void abortJob(OutputFormat outputFormat, Job job) throws IOException {
+ if (job.getConfiguration().get("mapred.job.tracker", "")
+ .equalsIgnoreCase("local")) {
+ try {
+ // This call needs to be removed after MAPREDUCE-1447 is fixed.
+ outputFormat.getOutputCommitter(createTaskAttemptContext(
+ job.getConfiguration(), new TaskAttemptID())).abortJob(job, JobStatus.State.FAILED);
+ } catch (IOException e) {
+ throw new IOException("Failed to abort job", e);
+ } catch (InterruptedException e) {
+ throw new IOException("Failed to abort job", e);
+ }
+ }
+ }
+
+ @Override
+ public InetSocketAddress getResourceManagerAddress(Configuration conf)
+ {
+ return JobTracker.getAddress(conf);
+ }
+
+ @Override
+ public String getPropertyName(PropertyName name) {
+ switch (name) {
+ case CACHE_ARCHIVES:
+ return DistributedCache.CACHE_ARCHIVES;
+ case CACHE_FILES:
+ return DistributedCache.CACHE_FILES;
+ case CACHE_SYMLINK:
+ return DistributedCache.CACHE_SYMLINK;
+ }
+
+ return "";
+ }
+
+ @Override
+ public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException {
+ // In hadoop 1.x.x the file system URI is sufficient to determine the uri of the file
+ return "hdfs".equals(fs.getUri().getScheme());
+ }
+ }
+ @Override
+ public WebHCatJTShim getWebHCatShim(Configuration conf) throws IOException {
+ return new WebHCatJTShim20S(conf);//this has state, so can't be cached
+ }
}
diff --git shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java
new file mode 100644
index 0000000..16b3eaf
--- /dev/null
+++ shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJTShim20S.java
@@ -0,0 +1,83 @@
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * This is in org.apache.hadoop.mapred package because it relies on
+ * JobSubmissionProtocol which is package private
+ */
+public class WebHCatJTShim20S implements WebHCatJTShim {
+ private JobSubmissionProtocol cnx;
+
+ /**
+ * Create a connection to the Job Tracker.
+ */
+ public WebHCatJTShim20S(Configuration conf)
+ throws IOException {
+ UserGroupInformation ugi = UserGroupInformation.getLoginUser();
+ cnx = (JobSubmissionProtocol)
+ RPC.getProxy(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID,
+ getAddress(conf),
+ ugi,
+ conf,
+ NetUtils.getSocketFactory(conf,
+ JobSubmissionProtocol.class));
+ }
+
+ /**
+ * Grab a handle to a job that is already known to the JobTracker.
+ *
+ * @return Profile of the job, or null if not found.
+ */
+ public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid)
+ throws IOException {
+ return cnx.getJobProfile(jobid);
+ }
+
+ /**
+ * Grab a handle to a job that is already known to the JobTracker.
+ *
+ * @return Status of the job, or null if not found.
+ */
+ public org.apache.hadoop.mapred.JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid)
+ throws IOException {
+ return cnx.getJobStatus(jobid);
+ }
+
+
+ /**
+ * Kill a job.
+ */
+ public void killJob(org.apache.hadoop.mapred.JobID jobid)
+ throws IOException {
+ cnx.killJob(jobid);
+ }
+
+ /**
+ * Get all the jobs submitted.
+ */
+ public org.apache.hadoop.mapred.JobStatus[] getAllJobs()
+ throws IOException {
+ return cnx.getAllJobs();
+ }
+
+ /**
+ * Close the connection to the Job Tracker.
+ */
+ public void close() {
+ RPC.stopProxy(cnx);
+ }
+ private InetSocketAddress getAddress(Configuration conf) {
+ String jobTrackerStr = conf.get("mapred.job.tracker", "localhost:8012");
+ return NetUtils.createSocketAddr(jobTrackerStr);
+ }
+ }
+
diff --git shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 62d7fc6..8d2f0b3 100644
--- shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.lang.Integer;
+import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
@@ -28,17 +29,24 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Trash;
-import org.apache.hadoop.hive.shims.HadoopShims.JobTrackerState;
-import org.apache.hadoop.hive.shims.HadoopShimsSecure;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.WebHCatJTShim23;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.util.HostUtil;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
@@ -230,4 +238,104 @@ public void shutdown() {
cluster.shutdown();
}
}
+ private volatile HCatHadoopShims hcatShimInstance;
+ @Override
+ public HCatHadoopShims getHCatShim() {
+ if(hcatShimInstance == null) {
+ hcatShimInstance = new HCatHadoopShims23();
+ }
+ return hcatShimInstance;
+ }
+ private final class HCatHadoopShims23 implements HCatHadoopShims {
+ @Override
+ public TaskID createTaskID() {
+ return new TaskID("", 0, TaskType.MAP, 0);
+ }
+
+ @Override
+ public TaskAttemptID createTaskAttemptID() {
+ return new TaskAttemptID("", 0, TaskType.MAP, 0, 0);
+ }
+
+ @Override
+ public org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext(Configuration conf,
+ org.apache.hadoop.mapreduce.TaskAttemptID taskId) {
+ return new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(
+ conf instanceof JobConf? new JobConf(conf) : conf,
+ taskId);
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapred.JobConf conf,
+ org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable) {
+ org.apache.hadoop.mapred.TaskAttemptContext newContext = null;
+ try {
+ java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.TaskAttemptContextImpl.class.getDeclaredConstructor(
+ org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapred.TaskAttemptID.class,
+ Reporter.class);
+ construct.setAccessible(true);
+ newContext = (org.apache.hadoop.mapred.TaskAttemptContext) construct.newInstance(
+ new JobConf(conf), taskId, (Reporter) progressable);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return newContext;
+ }
+
+ @Override
+ public JobContext createJobContext(Configuration conf,
+ JobID jobId) {
+ return new JobContextImpl(conf instanceof JobConf? new JobConf(conf) : conf,
+ jobId);
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.JobContext createJobContext(org.apache.hadoop.mapred.JobConf conf,
+ org.apache.hadoop.mapreduce.JobID jobId, Progressable progressable) {
+ return new org.apache.hadoop.mapred.JobContextImpl(
+ new JobConf(conf), jobId, (org.apache.hadoop.mapred.Reporter) progressable);
+ }
+
+ @Override
+ public void commitJob(OutputFormat outputFormat, Job job) throws IOException {
+ // Do nothing as this was fixed by MAPREDUCE-1447.
+ }
+
+ @Override
+ public void abortJob(OutputFormat outputFormat, Job job) throws IOException {
+ // Do nothing as this was fixed by MAPREDUCE-1447.
+ }
+
+ @Override
+ public InetSocketAddress getResourceManagerAddress(Configuration conf) {
+ String addr = conf.get("yarn.resourcemanager.address", "localhost:8032");
+
+ return NetUtils.createSocketAddr(addr);
+ }
+
+ @Override
+ public String getPropertyName(PropertyName name) {
+ switch (name) {
+ case CACHE_ARCHIVES:
+ return MRJobConfig.CACHE_ARCHIVES;
+ case CACHE_FILES:
+ return MRJobConfig.CACHE_FILES;
+ case CACHE_SYMLINK:
+ return MRJobConfig.CACHE_SYMLINK;
+ }
+
+ return "";
+ }
+
+ @Override
+ public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException {
+ // In case of viewfs we need to lookup where the actual file is to know the filesystem in use.
+ // resolvePath is a sure shot way of knowing which file system the file is.
+ return "hdfs".equals(fs.resolvePath(path).toUri().getScheme());
+ }
+ }
+ @Override
+ public WebHCatJTShim getWebHCatShim(Configuration conf) throws IOException {
+ return new WebHCatJTShim23(conf);//this has state, so can't be cached
+ }
}
diff --git shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java
new file mode 100644
index 0000000..8a3cc59
--- /dev/null
+++ shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJTShim23.java
@@ -0,0 +1,73 @@
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
+
+import java.io.IOException;
+
+public class WebHCatJTShim23 implements WebHCatJTShim {
+ private JobClient jc;
+
+ /**
+ * Create a connection to the Job Tracker.
+ */
+ public WebHCatJTShim23(Configuration conf)
+ throws IOException {
+
+ jc = new JobClient(conf);
+ }
+
+ /**
+ * Grab a handle to a job that is already known to the JobTracker.
+ *
+ * @return Profile of the job, or null if not found.
+ */
+ public JobProfile getJobProfile(JobID jobid)
+ throws IOException {
+ RunningJob rj = jc.getJob(jobid);
+ JobStatus jobStatus = rj.getJobStatus();
+ JobProfile jobProfile = new JobProfile(jobStatus.getUsername(), jobStatus.getJobID(),
+ jobStatus.getJobFile(), jobStatus.getTrackingUrl(), jobStatus.getJobName());
+ return jobProfile;
+ }
+
+ /**
+ * Grab a handle to a job that is already known to the JobTracker.
+ *
+ * @return Status of the job, or null if not found.
+ */
+ public JobStatus getJobStatus(JobID jobid)
+ throws IOException {
+ RunningJob rj = jc.getJob(jobid);
+ JobStatus jobStatus = rj.getJobStatus();
+ return jobStatus;
+ }
+
+
+ /**
+ * Kill a job.
+ */
+ public void killJob(JobID jobid)
+ throws IOException {
+ RunningJob rj = jc.getJob(jobid);
+ rj.killJob();
+ }
+
+ /**
+ * Get all the jobs submitted.
+ */
+ public JobStatus[] getAllJobs()
+ throws IOException {
+ return jc.getAllJobs();
+ }
+
+ /**
+ * Close the connection to the Job Tracker.
+ */
+ public void close() {
+ try {
+ jc.close();
+ } catch (IOException e) {
+ }
+ }
+}
diff --git shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
index 30c9fc1..93720b9 100644
--- shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
+++ shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java
@@ -20,6 +20,7 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -40,13 +41,19 @@
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobProfile;
+import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
@@ -461,4 +468,72 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte
Class> rrClass) throws IOException;
}
+ public HCatHadoopShims getHCatShim();
+ public interface HCatHadoopShims {
+
+ enum PropertyName {CACHE_ARCHIVES, CACHE_FILES, CACHE_SYMLINK}
+
+ public TaskID createTaskID();
+
+ public TaskAttemptID createTaskAttemptID();
+
+ public org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext(Configuration conf,
+ TaskAttemptID taskId);
+
+ public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(JobConf conf,
+ org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable);
+
+ public JobContext createJobContext(Configuration conf, JobID jobId);
+
+ public org.apache.hadoop.mapred.JobContext createJobContext(JobConf conf, JobID jobId, Progressable progressable);
+
+ public void commitJob(OutputFormat outputFormat, Job job) throws IOException;
+
+ public void abortJob(OutputFormat outputFormat, Job job) throws IOException;
+
+ /* Referring to job tracker in 0.20 and resource manager in 0.23 */
+ public InetSocketAddress getResourceManagerAddress(Configuration conf);
+
+ public String getPropertyName(PropertyName name);
+
+ /**
+ * Checks if file is in HDFS filesystem.
+ *
+ * @param fs
+ * @param path
+ * @return true if the file is in HDFS, false if the file is in other file systems.
+ */
+ public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException;
+ }
+ /**
+ * Provides a Hadoop JobTracker shim.
+ * @param conf not {@code null}
+ */
+ public WebHCatJTShim getWebHCatShim(Configuration conf) throws IOException;
+ public interface WebHCatJTShim {
+ /**
+ * Grab a handle to a job that is already known to the JobTracker.
+ *
+ * @return Profile of the job, or null if not found.
+ */
+ public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid) throws IOException;
+ /**
+ * Grab a handle to a job that is already known to the JobTracker.
+ *
+ * @return Status of the job, or null if not found.
+ */
+ public JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid) throws IOException;
+ /**
+ * Kill a job.
+ */
+ public void killJob(org.apache.hadoop.mapred.JobID jobid) throws IOException;
+ /**
+ * Get all the jobs submitted.
+ */
+ public JobStatus[] getAllJobs() throws IOException;
+ /**
+ * Close the connection to the Job Tracker.
+ */
+ public void close();
+ }
}