diff --git hcatalog/build.properties hcatalog/build.properties index a7ded31..105b8a8 100644 --- hcatalog/build.properties +++ hcatalog/build.properties @@ -63,13 +63,6 @@ javac.version=1.6 javac.args= javac.args.warnings= -# hive properties -#shims.name=20 -shims.20S.hive.shims.include=0.20,0.20S -shims.20S.hadoop.version=${hive.hadoop-0.20S.version} -shims.23.hive.shims.include=0.23 -shims.23.hadoop.version=${hive.hadoop-0.23.version} - ############################################################################### # deploy properties # diff --git hcatalog/core/src/main/java/org/apache/hcatalog/shims/HCatHadoopShims.java hcatalog/core/src/main/java/org/apache/hcatalog/shims/HCatHadoopShims.java index 6ce9924..7f816ab 100644 --- hcatalog/core/src/main/java/org/apache/hcatalog/shims/HCatHadoopShims.java +++ hcatalog/core/src/main/java/org/apache/hcatalog/shims/HCatHadoopShims.java @@ -101,6 +101,9 @@ private static HCatHadoopShims selectShim() { */ public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException; - public InetSocketAddress getAddress(Configuration conf); - + /** + * Provides a handle on a hadoop job. + * @param conf not null + */ + public TempletonJobTracker getTempletonJobTracker(Configuration conf) throws IOException; } diff --git hcatalog/core/src/main/java/org/apache/hcatalog/shims/TempletonJobTracker.java hcatalog/core/src/main/java/org/apache/hcatalog/shims/TempletonJobTracker.java new file mode 100644 index 0000000..f7f886b --- /dev/null +++ hcatalog/core/src/main/java/org/apache/hcatalog/shims/TempletonJobTracker.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hcatalog.shims; + +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.JobProfile; +import org.apache.hadoop.mapred.JobStatus; + +import java.io.IOException; + +/** + * Communicate with the JobTracker as a specific user. + */ +public interface TempletonJobTracker { + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Profile of the job, or null if not found. + */ + public JobProfile getJobProfile(JobID jobid) throws IOException; + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Status of the job, or null if not found. + */ + public JobStatus getJobStatus(JobID jobid) throws IOException; + /** + * Kill a job. + */ + public void killJob(JobID jobid) throws IOException; + /** + * Get all the jobs submitted. + */ + public JobStatus[] getAllJobs() throws IOException; + /** + * Close the connection to the Job Tracker. + */ + public void close(); +} diff --git hcatalog/shims/src/20/java/org/apache/hadoop/mapred/TempletonJobTracker.java hcatalog/shims/src/20/java/org/apache/hadoop/mapred/TempletonJobTracker.java deleted file mode 100644 index 97a8e17..0000000 --- hcatalog/shims/src/20/java/org/apache/hadoop/mapred/TempletonJobTracker.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.mapred; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hcatalog.shims.HCatHadoopShims; - -/* - * Communicate with the JobTracker as a specific user. - */ -public class TempletonJobTracker { - private JobSubmissionProtocol cnx; - - /** - * Create a connection to the Job Tracker. - */ - public TempletonJobTracker(Configuration conf) - throws IOException { - UserGroupInformation ugi = UserGroupInformation.getLoginUser(); - cnx = (JobSubmissionProtocol) - RPC.getProxy(JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, - HCatHadoopShims.Instance.get().getAddress(conf), - ugi, - conf, - NetUtils.getSocketFactory(conf, - JobSubmissionProtocol.class)); - } - - /** - * Grab a handle to a job that is already known to the JobTracker. - * - * @return Profile of the job, or null if not found. - */ - public JobProfile getJobProfile(JobID jobid) - throws IOException { - return cnx.getJobProfile(jobid); - } - - /** - * Grab a handle to a job that is already known to the JobTracker. - * - * @return Status of the job, or null if not found. - */ - public JobStatus getJobStatus(JobID jobid) - throws IOException { - return cnx.getJobStatus(jobid); - } - - - /** - * Kill a job. - */ - public void killJob(JobID jobid) - throws IOException { - cnx.killJob(jobid); - } - - /** - * Get all the jobs submitted. - */ - public JobStatus[] getAllJobs() - throws IOException { - return cnx.getAllJobs(); - } - - /** - * Close the connection to the Job Tracker. - */ - public void close() { - RPC.stopProxy(cnx); - } -} diff --git hcatalog/shims/src/20/java/org/apache/hadoop/mapred/TempletonJobTracker20S.java hcatalog/shims/src/20/java/org/apache/hadoop/mapred/TempletonJobTracker20S.java new file mode 100644 index 0000000..8917e94 --- /dev/null +++ hcatalog/shims/src/20/java/org/apache/hadoop/mapred/TempletonJobTracker20S.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hcatalog.shims.TempletonJobTracker; + +/* + * Communicate with the JobTracker as a specific user. + */ +public class TempletonJobTracker20S implements TempletonJobTracker { + private JobSubmissionProtocol cnx; + + /** + * Create a connection to the Job Tracker. + */ + public TempletonJobTracker20S(Configuration conf) + throws IOException { + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + cnx = (JobSubmissionProtocol) + RPC.getProxy(JobSubmissionProtocol.class, + JobSubmissionProtocol.versionID, + getAddress(conf), + ugi, + conf, + NetUtils.getSocketFactory(conf, + JobSubmissionProtocol.class)); + } + + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Profile of the job, or null if not found. + */ + public JobProfile getJobProfile(JobID jobid) + throws IOException { + return cnx.getJobProfile(jobid); + } + + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Status of the job, or null if not found. + */ + public JobStatus getJobStatus(JobID jobid) + throws IOException { + return cnx.getJobStatus(jobid); + } + + + /** + * Kill a job. + */ + public void killJob(JobID jobid) + throws IOException { + cnx.killJob(jobid); + } + + /** + * Get all the jobs submitted. + */ + public JobStatus[] getAllJobs() + throws IOException { + return cnx.getAllJobs(); + } + + /** + * Close the connection to the Job Tracker. + */ + public void close() { + RPC.stopProxy(cnx); + } + private InetSocketAddress getAddress(Configuration conf) { + String jobTrackerStr = conf.get("mapred.job.tracker", "localhost:8012"); + return NetUtils.createSocketAddr(jobTrackerStr); + } +} diff --git hcatalog/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java hcatalog/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java index 7ab73c1..bb9f20e 100644 --- hcatalog/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java +++ hcatalog/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobTracker; +import org.apache.hadoop.mapred.TempletonJobTracker20S; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -35,7 +36,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.Progressable; public class HCatHadoopShims20S implements HCatHadoopShims { @@ -152,9 +152,7 @@ public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException { } @Override - public InetSocketAddress getAddress(Configuration conf) { - String jobTrackerStr = - conf.get("mapred.job.tracker", "localhost:8012"); - return NetUtils.createSocketAddr(jobTrackerStr); + public TempletonJobTracker getTempletonJobTracker(Configuration conf) throws IOException { + return new TempletonJobTracker20S(conf); } } diff --git hcatalog/shims/src/23/java/org/apache/hadoop/mapred/TempletonJobTracker.java hcatalog/shims/src/23/java/org/apache/hadoop/mapred/TempletonJobTracker.java deleted file mode 100644 index 2ad8605..0000000 --- hcatalog/shims/src/23/java/org/apache/hadoop/mapred/TempletonJobTracker.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.mapred; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; - -/* - * Communicate with the JobTracker as a specific user. - */ -public class TempletonJobTracker { - private JobClient jc; - - /** - * Create a connection to the Job Tracker. - */ - public TempletonJobTracker(Configuration conf) - throws IOException { - - jc = new JobClient(conf); - } - - /** - * Grab a handle to a job that is already known to the JobTracker. - * - * @return Profile of the job, or null if not found. - */ - public JobProfile getJobProfile(JobID jobid) - throws IOException { - RunningJob rj = jc.getJob(jobid); - JobStatus jobStatus = rj.getJobStatus(); - JobProfile jobProfile = new JobProfile(jobStatus.getUsername(), jobStatus.getJobID(), - jobStatus.getJobFile(), jobStatus.getTrackingUrl(), jobStatus.getJobName()); - return jobProfile; - } - - /** - * Grab a handle to a job that is already known to the JobTracker. - * - * @return Status of the job, or null if not found. - */ - public JobStatus getJobStatus(JobID jobid) - throws IOException { - RunningJob rj = jc.getJob(jobid); - JobStatus jobStatus = rj.getJobStatus(); - return jobStatus; - } - - - /** - * Kill a job. - */ - public void killJob(JobID jobid) - throws IOException { - RunningJob rj = jc.getJob(jobid); - rj.killJob(); - } - - /** - * Get all the jobs submitted. - */ - public JobStatus[] getAllJobs() - throws IOException { - return jc.getAllJobs(); - } - - /** - * Close the connection to the Job Tracker. - */ - public void close() { - try { - jc.close(); - } catch (IOException e) { - } - } -} diff --git hcatalog/shims/src/23/java/org/apache/hadoop/mapred/TempletonJobTracker23.java hcatalog/shims/src/23/java/org/apache/hadoop/mapred/TempletonJobTracker23.java new file mode 100644 index 0000000..0d64925 --- /dev/null +++ hcatalog/shims/src/23/java/org/apache/hadoop/mapred/TempletonJobTracker23.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hcatalog.shims.TempletonJobTracker; + +/* + * Communicate with the JobTracker as a specific user. + */ +public class TempletonJobTracker23 implements TempletonJobTracker { + private JobClient jc; + + /** + * Create a connection to the Job Tracker. + */ + public TempletonJobTracker23(Configuration conf) + throws IOException { + + jc = new JobClient(conf); + } + + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Profile of the job, or null if not found. + */ + public JobProfile getJobProfile(JobID jobid) + throws IOException { + RunningJob rj = jc.getJob(jobid); + JobStatus jobStatus = rj.getJobStatus(); + JobProfile jobProfile = new JobProfile(jobStatus.getUsername(), jobStatus.getJobID(), + jobStatus.getJobFile(), jobStatus.getTrackingUrl(), jobStatus.getJobName()); + return jobProfile; + } + + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Status of the job, or null if not found. + */ + public JobStatus getJobStatus(JobID jobid) + throws IOException { + RunningJob rj = jc.getJob(jobid); + JobStatus jobStatus = rj.getJobStatus(); + return jobStatus; + } + + + /** + * Kill a job. + */ + public void killJob(JobID jobid) + throws IOException { + RunningJob rj = jc.getJob(jobid); + rj.killJob(); + } + + /** + * Get all the jobs submitted. + */ + public JobStatus[] getAllJobs() + throws IOException { + return jc.getAllJobs(); + } + + /** + * Close the connection to the Job Tracker. + */ + public void close() { + try { + jc.close(); + } catch (IOException e) { + } + } +} diff --git hcatalog/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java hcatalog/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java index 764c0c3..e97a9ff 100644 --- hcatalog/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java +++ hcatalog/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TempletonJobTracker23; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -129,7 +130,7 @@ public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException { } @Override - public InetSocketAddress getAddress(Configuration conf) { - return null; + public TempletonJobTracker getTempletonJobTracker(Configuration conf) throws IOException { + return new TempletonJobTracker23(conf); } } diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java index af7b2bd..30bb84d 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java @@ -20,8 +20,8 @@ import java.io.IOException; import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.TempletonJobTracker; -import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hcatalog.shims.HCatHadoopShims; +import org.apache.hcatalog.shims.TempletonJobTracker; import org.apache.hcatalog.templeton.tool.JobState; /** @@ -35,11 +35,10 @@ public DeleteDelegator(AppConfig appConf) { public QueueStatusBean run(String user, String id) throws NotAuthorizedException, BadParam, IOException, InterruptedException { - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); TempletonJobTracker tracker = null; JobState state = null; try { - tracker = new TempletonJobTracker(appConf); + tracker = HCatHadoopShims.Instance.get().getTempletonJobTracker(appConf); JobID jobid = StatusDelegator.StringToJobID(id); if (jobid == null) throw new BadParam("Invalid jobid: " + id); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java index c963953..11e3940 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java @@ -23,8 +23,8 @@ import java.util.ArrayList; import org.apache.hadoop.mapred.JobStatus; -import org.apache.hadoop.mapred.TempletonJobTracker; -import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hcatalog.shims.HCatHadoopShims; +import org.apache.hcatalog.shims.TempletonJobTracker; import org.apache.hcatalog.templeton.tool.JobState; /** @@ -38,10 +38,9 @@ public ListDelegator(AppConfig appConf) { public List run(String user) throws NotAuthorizedException, BadParam, IOException, InterruptedException { - UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); TempletonJobTracker tracker = null; try { - tracker = new TempletonJobTracker(appConf); + tracker = HCatHadoopShims.Instance.get().getTempletonJobTracker(appConf); ArrayList ids = new ArrayList(); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java index 27f80d3..b606e7e 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java @@ -25,7 +25,8 @@ import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.JobProfile; import org.apache.hadoop.mapred.JobStatus; -import org.apache.hadoop.mapred.TempletonJobTracker; +import org.apache.hcatalog.shims.HCatHadoopShims; +import org.apache.hcatalog.shims.TempletonJobTracker; import org.apache.hcatalog.templeton.tool.JobState; /** @@ -44,7 +45,7 @@ public QueueStatusBean run(String user, String id) TempletonJobTracker tracker = null; JobState state = null; try { - tracker = new TempletonJobTracker(appConf); + tracker = HCatHadoopShims.Instance.get().getTempletonJobTracker(appConf); JobID jobid = StatusDelegator.StringToJobID(id); if (jobid == null) throw new BadParam("Invalid jobid: " + id);