diff --git hcatalog/build-support/ant/deploy.xml hcatalog/build-support/ant/deploy.xml index b6f9534..3404154 100644 --- hcatalog/build-support/ant/deploy.xml +++ hcatalog/build-support/ant/deploy.xml @@ -69,7 +69,7 @@ <_mvnpublish module="testutils" /> - + - @@ -200,7 +199,6 @@ - - - - - - - - - - - - - <_javac srcDir="${basedir}/src/${hadoopversion}/java" - destDir="${path.to.basedir}/core/build/classes" - classPathRef="compile.class.path"/> - - diff --git hcatalog/shims/src/20/java/org/apache/hadoop/mapred/TempletonJobTracker.java hcatalog/shims/src/20/java/org/apache/hadoop/mapred/TempletonJobTracker.java deleted file mode 100644 index 97a8e17..0000000 --- hcatalog/shims/src/20/java/org/apache/hadoop/mapred/TempletonJobTracker.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.mapred; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hcatalog.shims.HCatHadoopShims; - -/* - * Communicate with the JobTracker as a specific user. - */ -public class TempletonJobTracker { - private JobSubmissionProtocol cnx; - - /** - * Create a connection to the Job Tracker. - */ - public TempletonJobTracker(Configuration conf) - throws IOException { - UserGroupInformation ugi = UserGroupInformation.getLoginUser(); - cnx = (JobSubmissionProtocol) - RPC.getProxy(JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, - HCatHadoopShims.Instance.get().getAddress(conf), - ugi, - conf, - NetUtils.getSocketFactory(conf, - JobSubmissionProtocol.class)); - } - - /** - * Grab a handle to a job that is already known to the JobTracker. - * - * @return Profile of the job, or null if not found. - */ - public JobProfile getJobProfile(JobID jobid) - throws IOException { - return cnx.getJobProfile(jobid); - } - - /** - * Grab a handle to a job that is already known to the JobTracker. - * - * @return Status of the job, or null if not found. - */ - public JobStatus getJobStatus(JobID jobid) - throws IOException { - return cnx.getJobStatus(jobid); - } - - - /** - * Kill a job. - */ - public void killJob(JobID jobid) - throws IOException { - cnx.killJob(jobid); - } - - /** - * Get all the jobs submitted. - */ - public JobStatus[] getAllJobs() - throws IOException { - return cnx.getAllJobs(); - } - - /** - * Close the connection to the Job Tracker. - */ - public void close() { - RPC.stopProxy(cnx); - } -} diff --git hcatalog/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java hcatalog/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java deleted file mode 100644 index 7ab73c1..0000000 --- hcatalog/shims/src/20/java/org/apache/hcatalog/shims/HCatHadoopShims20S.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hcatalog.shims; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobTracker; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.JobStatus.State; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.util.Progressable; - -public class HCatHadoopShims20S implements HCatHadoopShims { - @Override - public TaskID createTaskID() { - return new TaskID(); - } - - @Override - public TaskAttemptID createTaskAttemptID() { - return new TaskAttemptID(); - } - - @Override - public TaskAttemptContext createTaskAttemptContext(Configuration conf, TaskAttemptID taskId) { - return new TaskAttemptContext(conf, taskId); - } - - @Override - public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapred.JobConf conf, - org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable) { - org.apache.hadoop.mapred.TaskAttemptContext newContext = null; - try { - java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.TaskAttemptContext.class.getDeclaredConstructor( - org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapred.TaskAttemptID.class, - Progressable.class); - construct.setAccessible(true); - newContext = (org.apache.hadoop.mapred.TaskAttemptContext)construct.newInstance(conf, taskId, progressable); - } catch (Exception e) { - throw new RuntimeException(e); - } - return newContext; - } - - @Override - public JobContext createJobContext(Configuration conf, - JobID jobId) { - return new JobContext(conf, jobId); - } - - @Override - public org.apache.hadoop.mapred.JobContext createJobContext(org.apache.hadoop.mapred.JobConf conf, - org.apache.hadoop.mapreduce.JobID jobId, Progressable progressable) { - org.apache.hadoop.mapred.JobContext newContext = null; - try { - java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.JobContext.class.getDeclaredConstructor( - org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapreduce.JobID.class, - Progressable.class); - construct.setAccessible(true); - newContext = (org.apache.hadoop.mapred.JobContext)construct.newInstance(conf, jobId, progressable); - } catch (Exception e) { - throw new RuntimeException(e); - } - return newContext; - } - - @Override - public void commitJob(OutputFormat outputFormat, Job job) throws IOException { - if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) { - try { - //In local mode, mapreduce will not call OutputCommitter.cleanupJob. - //Calling it from here so that the partition publish happens. - //This call needs to be removed after MAPREDUCE-1447 is fixed. - outputFormat.getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext( - job.getConfiguration(), HCatHadoopShims.Instance.get().createTaskAttemptID())).commitJob(job); - } catch (IOException e) { - throw new IOException("Failed to cleanup job",e); - } catch (InterruptedException e) { - throw new IOException("Failed to cleanup job",e); - } - } - } - - @Override - public void abortJob(OutputFormat outputFormat, Job job) throws IOException { - if (job.getConfiguration().get("mapred.job.tracker", "") - .equalsIgnoreCase("local")) { - try { - // This call needs to be removed after MAPREDUCE-1447 is fixed. - outputFormat.getOutputCommitter(HCatHadoopShims.Instance.get().createTaskAttemptContext( - job.getConfiguration(), new TaskAttemptID())).abortJob(job, State.FAILED); - } catch (IOException e) { - throw new IOException("Failed to abort job", e); - } catch (InterruptedException e) { - throw new IOException("Failed to abort job", e); - } - } - } - - @Override - public InetSocketAddress getResourceManagerAddress(Configuration conf) - { - return JobTracker.getAddress(conf); - } - - @Override - public String getPropertyName(PropertyName name) { - switch (name) { - case CACHE_ARCHIVES: - return DistributedCache.CACHE_ARCHIVES; - case CACHE_FILES: - return DistributedCache.CACHE_FILES; - case CACHE_SYMLINK: - return DistributedCache.CACHE_SYMLINK; - } - - return ""; - } - - @Override - public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException { - // In hadoop 1.x.x the file system URI is sufficient to determine the uri of the file - return "hdfs".equals(fs.getUri().getScheme()); - } - - @Override - public InetSocketAddress getAddress(Configuration conf) { - String jobTrackerStr = - conf.get("mapred.job.tracker", "localhost:8012"); - return NetUtils.createSocketAddr(jobTrackerStr); - } -} diff --git hcatalog/shims/src/23/java/org/apache/hadoop/mapred/TempletonJobTracker.java hcatalog/shims/src/23/java/org/apache/hadoop/mapred/TempletonJobTracker.java deleted file mode 100644 index 2ad8605..0000000 --- hcatalog/shims/src/23/java/org/apache/hadoop/mapred/TempletonJobTracker.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.mapred; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; - -/* - * Communicate with the JobTracker as a specific user. - */ -public class TempletonJobTracker { - private JobClient jc; - - /** - * Create a connection to the Job Tracker. - */ - public TempletonJobTracker(Configuration conf) - throws IOException { - - jc = new JobClient(conf); - } - - /** - * Grab a handle to a job that is already known to the JobTracker. - * - * @return Profile of the job, or null if not found. - */ - public JobProfile getJobProfile(JobID jobid) - throws IOException { - RunningJob rj = jc.getJob(jobid); - JobStatus jobStatus = rj.getJobStatus(); - JobProfile jobProfile = new JobProfile(jobStatus.getUsername(), jobStatus.getJobID(), - jobStatus.getJobFile(), jobStatus.getTrackingUrl(), jobStatus.getJobName()); - return jobProfile; - } - - /** - * Grab a handle to a job that is already known to the JobTracker. - * - * @return Status of the job, or null if not found. - */ - public JobStatus getJobStatus(JobID jobid) - throws IOException { - RunningJob rj = jc.getJob(jobid); - JobStatus jobStatus = rj.getJobStatus(); - return jobStatus; - } - - - /** - * Kill a job. - */ - public void killJob(JobID jobid) - throws IOException { - RunningJob rj = jc.getJob(jobid); - rj.killJob(); - } - - /** - * Get all the jobs submitted. - */ - public JobStatus[] getAllJobs() - throws IOException { - return jc.getAllJobs(); - } - - /** - * Close the connection to the Job Tracker. - */ - public void close() { - try { - jc.close(); - } catch (IOException e) { - } - } -} diff --git hcatalog/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java hcatalog/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java deleted file mode 100644 index 764c0c3..0000000 --- hcatalog/shims/src/23/java/org/apache/hcatalog/shims/HCatHadoopShims23.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hcatalog.shims; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.task.JobContextImpl; -import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; - - -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.net.NetUtils; - -public class HCatHadoopShims23 implements HCatHadoopShims { - @Override - public TaskID createTaskID() { - return new TaskID("", 0, TaskType.MAP, 0); - } - - @Override - public TaskAttemptID createTaskAttemptID() { - return new TaskAttemptID("", 0, TaskType.MAP, 0, 0); - } - - @Override - public org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext(Configuration conf, - org.apache.hadoop.mapreduce.TaskAttemptID taskId) { - return new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl( - conf instanceof JobConf? new JobConf(conf) : conf, - taskId); - } - - @Override - public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapred.JobConf conf, - org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable) { - org.apache.hadoop.mapred.TaskAttemptContext newContext = null; - try { - java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.TaskAttemptContextImpl.class.getDeclaredConstructor( - org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapred.TaskAttemptID.class, - Reporter.class); - construct.setAccessible(true); - newContext = (org.apache.hadoop.mapred.TaskAttemptContext) construct.newInstance( - new JobConf(conf), taskId, (Reporter) progressable); - } catch (Exception e) { - throw new RuntimeException(e); - } - return newContext; - } - - @Override - public JobContext createJobContext(Configuration conf, - JobID jobId) { - return new JobContextImpl(conf instanceof JobConf? new JobConf(conf) : conf, - jobId); - } - - @Override - public org.apache.hadoop.mapred.JobContext createJobContext(org.apache.hadoop.mapred.JobConf conf, - org.apache.hadoop.mapreduce.JobID jobId, Progressable progressable) { - return new org.apache.hadoop.mapred.JobContextImpl( - new JobConf(conf), jobId, (org.apache.hadoop.mapred.Reporter) progressable); - } - - @Override - public void commitJob(OutputFormat outputFormat, Job job) throws IOException { - // Do nothing as this was fixed by MAPREDUCE-1447. - } - - @Override - public void abortJob(OutputFormat outputFormat, Job job) throws IOException { - // Do nothing as this was fixed by MAPREDUCE-1447. - } - - @Override - public InetSocketAddress getResourceManagerAddress(Configuration conf) { - String addr = conf.get("yarn.resourcemanager.address", "localhost:8032"); - - return NetUtils.createSocketAddr(addr); - } - - @Override - public String getPropertyName(PropertyName name) { - switch (name) { - case CACHE_ARCHIVES: - return MRJobConfig.CACHE_ARCHIVES; - case CACHE_FILES: - return MRJobConfig.CACHE_FILES; - case CACHE_SYMLINK: - return MRJobConfig.CACHE_SYMLINK; - } - - return ""; - } - - @Override - public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException { - // In case of viewfs we need to lookup where the actual file is to know the filesystem in use. - // resolvePath is a sure shot way of knowing which file system the file is. - return "hdfs".equals(fs.resolvePath(path).toUri().getScheme()); - } - - @Override - public InetSocketAddress getAddress(Configuration conf) { - return null; - } -} diff --git hcatalog/webhcat/svr/pom.xml hcatalog/webhcat/svr/pom.xml index edbca90..d34724e 100644 --- hcatalog/webhcat/svr/pom.xml +++ hcatalog/webhcat/svr/pom.xml @@ -36,7 +36,18 @@ http://maven.apache.org - + @@ -74,12 +85,6 @@ compile - com.sun.jersey - jersey-json - ${jersey.version} - compile - - org.codehaus.jackson jackson-core-asl ${jackson.version} diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java index af7b2bd..372f3b3 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/DeleteDelegator.java @@ -19,8 +19,10 @@ package org.apache.hcatalog.templeton; import java.io.IOException; + +import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJobTracker; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.TempletonJobTracker; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hcatalog.templeton.tool.JobState; @@ -36,10 +38,10 @@ public QueueStatusBean run(String user, String id) throws NotAuthorizedException, BadParam, IOException, InterruptedException { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); - TempletonJobTracker tracker = null; + WebHCatJobTracker tracker = null; JobState state = null; try { - tracker = new TempletonJobTracker(appConf); + tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf); JobID jobid = StatusDelegator.StringToJobID(id); if (jobid == null) throw new BadParam("Invalid jobid: " + id); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java index c963953..69e6771 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/ListDelegator.java @@ -22,8 +22,9 @@ import java.util.List; import java.util.ArrayList; +import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJobTracker; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobStatus; -import org.apache.hadoop.mapred.TempletonJobTracker; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hcatalog.templeton.tool.JobState; @@ -39,9 +40,9 @@ public ListDelegator(AppConfig appConf) { throws NotAuthorizedException, BadParam, IOException, InterruptedException { UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); - TempletonJobTracker tracker = null; + WebHCatJobTracker tracker = null; try { - tracker = new TempletonJobTracker(appConf); + tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf); ArrayList ids = new ArrayList(); diff --git hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java index 27f80d3..e40f0b3 100644 --- hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java +++ hcatalog/webhcat/svr/src/main/java/org/apache/hcatalog/templeton/StatusDelegator.java @@ -22,10 +22,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJobTracker; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.JobProfile; import org.apache.hadoop.mapred.JobStatus; -import org.apache.hadoop.mapred.TempletonJobTracker; import org.apache.hcatalog.templeton.tool.JobState; /** @@ -41,10 +42,10 @@ public StatusDelegator(AppConfig appConf) { public QueueStatusBean run(String user, String id) throws NotAuthorizedException, BadParam, IOException, InterruptedException { - TempletonJobTracker tracker = null; + WebHCatJobTracker tracker = null; JobState state = null; try { - tracker = new TempletonJobTracker(appConf); + tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf); JobID jobid = StatusDelegator.StringToJobID(id); if (jobid == null) throw new BadParam("Invalid jobid: " + id); @@ -60,7 +61,7 @@ public QueueStatusBean run(String user, String id) } } - public static QueueStatusBean makeStatus(TempletonJobTracker tracker, + public static QueueStatusBean makeStatus(WebHCatJobTracker tracker, JobID jobid, String childid, JobState state) @@ -87,7 +88,7 @@ public static QueueStatusBean makeStatus(TempletonJobTracker tracker, return new QueueStatusBean(state, status, profile); } - public static QueueStatusBean makeStatus(TempletonJobTracker tracker, + public static QueueStatusBean makeStatus(WebHCatJobTracker tracker, JobID jobid, JobState state) throws BadParam, IOException { diff --git shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java index d2bb34d..bf5de38 100644 --- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java +++ shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java @@ -735,5 +735,12 @@ public String getTokenFileLocEnvName() { throw new UnsupportedOperationException( "Kerberos not supported in current hadoop version"); } - + @Override + public HCatHadoopShims getHCatShim() { + throw new UnsupportedOperationException("HCatalog does not support Hadoop 0.20.x"); + } + @Override + public WebHCatJobTracker getWebHCatShim(Configuration conf) throws IOException { + throw new UnsupportedOperationException("WebHCat does not support Hadoop 0.20.x"); + } } diff --git shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index 3c93393..05e073f 100644 --- shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -18,21 +18,31 @@ package org.apache.hadoop.hive.shims; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Trash; -import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.mapred.JobTracker; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TaskLogServlet; +import org.apache.hadoop.mapred.WebHCatJobTracker20S; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; @@ -195,4 +205,129 @@ public void shutdown() { cluster.shutdown(); } } + private volatile HCatHadoopShims hcatShimInstance; + @Override + public HCatHadoopShims getHCatShim() { + if(hcatShimInstance == null) { + hcatShimInstance = new HCatHadoopShims20S(); + } + return hcatShimInstance; + } + private final class HCatHadoopShims20S implements HCatHadoopShims { + @Override + public TaskID createTaskID() { + return new TaskID(); + } + + @Override + public TaskAttemptID createTaskAttemptID() { + return new TaskAttemptID(); + } + + @Override + public TaskAttemptContext createTaskAttemptContext(Configuration conf, TaskAttemptID taskId) { + return new TaskAttemptContext(conf, taskId); + } + + @Override + public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapred.JobConf conf, + org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable) { + org.apache.hadoop.mapred.TaskAttemptContext newContext = null; + try { + java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.TaskAttemptContext.class.getDeclaredConstructor( + org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapred.TaskAttemptID.class, + Progressable.class); + construct.setAccessible(true); + newContext = (org.apache.hadoop.mapred.TaskAttemptContext)construct.newInstance(conf, taskId, progressable); + } catch (Exception e) { + throw new RuntimeException(e); + } + return newContext; + } + + @Override + public JobContext createJobContext(Configuration conf, + JobID jobId) { + return new JobContext(conf, jobId); + } + + @Override + public org.apache.hadoop.mapred.JobContext createJobContext(org.apache.hadoop.mapred.JobConf conf, + org.apache.hadoop.mapreduce.JobID jobId, Progressable progressable) { + org.apache.hadoop.mapred.JobContext newContext = null; + try { + java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.JobContext.class.getDeclaredConstructor( + org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapreduce.JobID.class, + Progressable.class); + construct.setAccessible(true); + newContext = (org.apache.hadoop.mapred.JobContext)construct.newInstance(conf, jobId, progressable); + } catch (Exception e) { + throw new RuntimeException(e); + } + return newContext; + } + + @Override + public void commitJob(OutputFormat outputFormat, Job job) throws IOException { + if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) { + try { + //In local mode, mapreduce will not call OutputCommitter.cleanupJob. + //Calling it from here so that the partition publish happens. + //This call needs to be removed after MAPREDUCE-1447 is fixed. + outputFormat.getOutputCommitter(createTaskAttemptContext( + job.getConfiguration(), createTaskAttemptID())).commitJob(job); + } catch (IOException e) { + throw new IOException("Failed to cleanup job",e); + } catch (InterruptedException e) { + throw new IOException("Failed to cleanup job",e); + } + } + } + + @Override + public void abortJob(OutputFormat outputFormat, Job job) throws IOException { + if (job.getConfiguration().get("mapred.job.tracker", "") + .equalsIgnoreCase("local")) { + try { + // This call needs to be removed after MAPREDUCE-1447 is fixed. + outputFormat.getOutputCommitter(createTaskAttemptContext( + job.getConfiguration(), new TaskAttemptID())).abortJob(job, JobStatus.State.FAILED); + } catch (IOException e) { + throw new IOException("Failed to abort job", e); + } catch (InterruptedException e) { + throw new IOException("Failed to abort job", e); + } + } + } + + @Override + public InetSocketAddress getResourceManagerAddress(Configuration conf) + { + return JobTracker.getAddress(conf); + } + + @Override + public String getPropertyName(PropertyName name) { + switch (name) { + case CACHE_ARCHIVES: + return DistributedCache.CACHE_ARCHIVES; + case CACHE_FILES: + return DistributedCache.CACHE_FILES; + case CACHE_SYMLINK: + return DistributedCache.CACHE_SYMLINK; + } + + return ""; + } + + @Override + public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException { + // In hadoop 1.x.x the file system URI is sufficient to determine the uri of the file + return "hdfs".equals(fs.getUri().getScheme()); + } + } + @Override + public WebHCatJobTracker getWebHCatShim(Configuration conf) throws IOException { + return new WebHCatJobTracker20S(conf);//this has state, so can't be cached + } } diff --git shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJobTracker20S.java shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJobTracker20S.java new file mode 100644 index 0000000..bd82df2 --- /dev/null +++ shims/src/0.20S/java/org/apache/hadoop/mapred/WebHCatJobTracker20S.java @@ -0,0 +1,83 @@ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJobTracker; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * This is in org.apache.hadoop.mapred package because it relies on + * JobSubmissionProtocol which is package private + */ +public class WebHCatJobTracker20S implements WebHCatJobTracker { + private JobSubmissionProtocol cnx; + + /** + * Create a connection to the Job Tracker. + */ + public WebHCatJobTracker20S(Configuration conf) + throws IOException { + UserGroupInformation ugi = UserGroupInformation.getLoginUser(); + cnx = (JobSubmissionProtocol) + RPC.getProxy(JobSubmissionProtocol.class, + JobSubmissionProtocol.versionID, + getAddress(conf), + ugi, + conf, + NetUtils.getSocketFactory(conf, + JobSubmissionProtocol.class)); + } + + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Profile of the job, or null if not found. + */ + public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid) + throws IOException { + return cnx.getJobProfile(jobid); + } + + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Status of the job, or null if not found. + */ + public org.apache.hadoop.mapred.JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid) + throws IOException { + return cnx.getJobStatus(jobid); + } + + + /** + * Kill a job. + */ + public void killJob(org.apache.hadoop.mapred.JobID jobid) + throws IOException { + cnx.killJob(jobid); + } + + /** + * Get all the jobs submitted. + */ + public org.apache.hadoop.mapred.JobStatus[] getAllJobs() + throws IOException { + return cnx.getAllJobs(); + } + + /** + * Close the connection to the Job Tracker. + */ + public void close() { + RPC.stopProxy(cnx); + } + private InetSocketAddress getAddress(Configuration conf) { + String jobTrackerStr = conf.get("mapred.job.tracker", "localhost:8012"); + return NetUtils.createSocketAddr(jobTrackerStr); + } + } + diff --git shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 62d7fc6..ddb6b38 100644 --- shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.lang.Integer; +import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; import java.util.Map; @@ -28,17 +29,24 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Trash; -import org.apache.hadoop.hive.shims.HadoopShims.JobTrackerState; -import org.apache.hadoop.hive.shims.HadoopShimsSecure; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.WebHCatJobTracker23; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.util.HostUtil; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.mapred.lib.TotalOrderPartitioner; @@ -230,4 +238,104 @@ public void shutdown() { cluster.shutdown(); } } + private volatile HCatHadoopShims hcatShimInstance; + @Override + public HCatHadoopShims getHCatShim() { + if(hcatShimInstance == null) { + hcatShimInstance = new HCatHadoopShims23(); + } + return hcatShimInstance; + } + private final class HCatHadoopShims23 implements HCatHadoopShims { + @Override + public TaskID createTaskID() { + return new TaskID("", 0, TaskType.MAP, 0); + } + + @Override + public TaskAttemptID createTaskAttemptID() { + return new TaskAttemptID("", 0, TaskType.MAP, 0, 0); + } + + @Override + public org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext(Configuration conf, + org.apache.hadoop.mapreduce.TaskAttemptID taskId) { + return new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl( + conf instanceof JobConf? new JobConf(conf) : conf, + taskId); + } + + @Override + public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapred.JobConf conf, + org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable) { + org.apache.hadoop.mapred.TaskAttemptContext newContext = null; + try { + java.lang.reflect.Constructor construct = org.apache.hadoop.mapred.TaskAttemptContextImpl.class.getDeclaredConstructor( + org.apache.hadoop.mapred.JobConf.class, org.apache.hadoop.mapred.TaskAttemptID.class, + Reporter.class); + construct.setAccessible(true); + newContext = (org.apache.hadoop.mapred.TaskAttemptContext) construct.newInstance( + new JobConf(conf), taskId, (Reporter) progressable); + } catch (Exception e) { + throw new RuntimeException(e); + } + return newContext; + } + + @Override + public JobContext createJobContext(Configuration conf, + JobID jobId) { + return new JobContextImpl(conf instanceof JobConf? new JobConf(conf) : conf, + jobId); + } + + @Override + public org.apache.hadoop.mapred.JobContext createJobContext(org.apache.hadoop.mapred.JobConf conf, + org.apache.hadoop.mapreduce.JobID jobId, Progressable progressable) { + return new org.apache.hadoop.mapred.JobContextImpl( + new JobConf(conf), jobId, (org.apache.hadoop.mapred.Reporter) progressable); + } + + @Override + public void commitJob(OutputFormat outputFormat, Job job) throws IOException { + // Do nothing as this was fixed by MAPREDUCE-1447. + } + + @Override + public void abortJob(OutputFormat outputFormat, Job job) throws IOException { + // Do nothing as this was fixed by MAPREDUCE-1447. + } + + @Override + public InetSocketAddress getResourceManagerAddress(Configuration conf) { + String addr = conf.get("yarn.resourcemanager.address", "localhost:8032"); + + return NetUtils.createSocketAddr(addr); + } + + @Override + public String getPropertyName(PropertyName name) { + switch (name) { + case CACHE_ARCHIVES: + return MRJobConfig.CACHE_ARCHIVES; + case CACHE_FILES: + return MRJobConfig.CACHE_FILES; + case CACHE_SYMLINK: + return MRJobConfig.CACHE_SYMLINK; + } + + return ""; + } + + @Override + public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException { + // In case of viewfs we need to lookup where the actual file is to know the filesystem in use. + // resolvePath is a sure shot way of knowing which file system the file is. + return "hdfs".equals(fs.resolvePath(path).toUri().getScheme()); + } + } + @Override + public WebHCatJobTracker getWebHCatShim(Configuration conf) throws IOException { + return new WebHCatJobTracker23(conf);//this has state, so can't be cached + } } diff --git shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJobTracker23.java shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJobTracker23.java new file mode 100644 index 0000000..5316181 --- /dev/null +++ shims/src/0.23/java/org/apache/hadoop/mapred/WebHCatJobTracker23.java @@ -0,0 +1,73 @@ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJobTracker; + +import java.io.IOException; + +public class WebHCatJobTracker23 implements WebHCatJobTracker { + private JobClient jc; + + /** + * Create a connection to the Job Tracker. + */ + public WebHCatJobTracker23(Configuration conf) + throws IOException { + + jc = new JobClient(conf); + } + + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Profile of the job, or null if not found. + */ + public JobProfile getJobProfile(JobID jobid) + throws IOException { + RunningJob rj = jc.getJob(jobid); + JobStatus jobStatus = rj.getJobStatus(); + JobProfile jobProfile = new JobProfile(jobStatus.getUsername(), jobStatus.getJobID(), + jobStatus.getJobFile(), jobStatus.getTrackingUrl(), jobStatus.getJobName()); + return jobProfile; + } + + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Status of the job, or null if not found. + */ + public JobStatus getJobStatus(JobID jobid) + throws IOException { + RunningJob rj = jc.getJob(jobid); + JobStatus jobStatus = rj.getJobStatus(); + return jobStatus; + } + + + /** + * Kill a job. + */ + public void killJob(JobID jobid) + throws IOException { + RunningJob rj = jc.getJob(jobid); + rj.killJob(); + } + + /** + * Get all the jobs submitted. + */ + public JobStatus[] getAllJobs() + throws IOException { + return jc.getAllJobs(); + } + + /** + * Close the connection to the Job Tracker. + */ + public void close() { + try { + jc.close(); + } catch (IOException e) { + } + } +} diff --git shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java index 30c9fc1..7d49b62 100644 --- shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -20,6 +20,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; @@ -40,13 +41,19 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobProfile; +import org.apache.hadoop.mapred.JobStatus; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskCompletionEvent; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; @@ -461,4 +468,72 @@ RecordReader getRecordReader(JobConf job, InputSplitShim split, Reporter reporte Class> rrClass) throws IOException; } + public HCatHadoopShims getHCatShim(); + public interface HCatHadoopShims { + + enum PropertyName {CACHE_ARCHIVES, CACHE_FILES, CACHE_SYMLINK} + + public TaskID createTaskID(); + + public TaskAttemptID createTaskAttemptID(); + + public org.apache.hadoop.mapreduce.TaskAttemptContext createTaskAttemptContext(Configuration conf, + TaskAttemptID taskId); + + public org.apache.hadoop.mapred.TaskAttemptContext createTaskAttemptContext(JobConf conf, + org.apache.hadoop.mapred.TaskAttemptID taskId, Progressable progressable); + + public JobContext createJobContext(Configuration conf, JobID jobId); + + public org.apache.hadoop.mapred.JobContext createJobContext(JobConf conf, JobID jobId, Progressable progressable); + + public void commitJob(OutputFormat outputFormat, Job job) throws IOException; + + public void abortJob(OutputFormat outputFormat, Job job) throws IOException; + + /* Referring to job tracker in 0.20 and resource manager in 0.23 */ + public InetSocketAddress getResourceManagerAddress(Configuration conf); + + public String getPropertyName(PropertyName name); + + /** + * Checks if file is in HDFS filesystem. + * + * @param fs + * @param path + * @return true if the file is in HDFS, false if the file is in other file systems. + */ + public boolean isFileInHDFS(FileSystem fs, Path path) throws IOException; + } + /** + * Provides a handle on a hadoop job. + * @param conf not null + */ + public WebHCatJobTracker getWebHCatShim(Configuration conf) throws IOException; + public interface WebHCatJobTracker { + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Profile of the job, or null if not found. + */ + public JobProfile getJobProfile(org.apache.hadoop.mapred.JobID jobid) throws IOException; + /** + * Grab a handle to a job that is already known to the JobTracker. + * + * @return Status of the job, or null if not found. + */ + public JobStatus getJobStatus(org.apache.hadoop.mapred.JobID jobid) throws IOException; + /** + * Kill a job. + */ + public void killJob(org.apache.hadoop.mapred.JobID jobid) throws IOException; + /** + * Get all the jobs submitted. + */ + public JobStatus[] getAllJobs() throws IOException; + /** + * Close the connection to the Job Tracker. + */ + public void close(); + } }