diff --git build-common.xml build-common.xml index f4367c0..96e8046 100644 --- build-common.xml +++ build-common.xml @@ -463,7 +463,7 @@ - + diff --git build.properties build.properties index 2b9127a..fd61db1 100644 --- build.properties +++ build.properties @@ -141,6 +141,9 @@ datanucleus.repo=http://www.datanucleus.org/downloads/maven2 # JVM arguments jvm.args=-XX:-UseSplitVerifier +# junit jvm args +junit.jvm.args=-XX:-UseSplitVerifier -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=128M + # # Eclipse Properties # diff --git ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java index 6b35e7c..f6a68f1 100644 --- ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java +++ ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java @@ -46,7 +46,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; @@ -79,7 +78,6 @@ import org.apache.hadoop.hive.serde2.thrift.test.Complex; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TextInputFormat; @@ -117,7 +115,7 @@ private FileSystem fs; protected final boolean overWrite; private CliDriver cliDriver; - private MiniMRCluster mr = null; + private HadoopShims.MiniMrShim mr = null; private HadoopShims.MiniDFSShim dfs = null; private boolean miniMr = false; private String hadoopVer = null; @@ -222,6 +220,9 @@ public void initConf() throws Exception { if (miniMr) { assert dfs != null; assert mr != null; + + mr.setupConfiguration(conf); + // set fs.default.name to the uri of mini-dfs String dfsUriString = getHdfsUriString(dfs.getFileSystem().getUri().toString()); conf.setVar(HiveConf.ConfVars.HADOOPFS, dfsUriString); @@ -229,25 +230,6 @@ public void initConf() throws Exception { conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, (new Path(dfsUriString, "/build/ql/test/data/warehouse/")).toString()); - int port = 0; - - try { - // Hadoop20 MiniMRCluster will return a proper port. - // Hadoop23 MiniMRCluster does not implement this method so use the default RM port. - port = mr.getJobTrackerPort(); - } catch (UnsupportedOperationException e) { - String address = - StringUtils.substringAfterLast(conf.get("yarn.resourcemanager.address"), ":"); - - if (StringUtils.isBlank(address)) { - throw new IllegalArgumentException("Invalid YARN resource manager port."); - } - - port = Integer.parseInt(address); - } - - ShimLoader.getHadoopShims().setJobLauncherRpcAddress(conf, - "localhost:" + port); } } @@ -299,7 +281,7 @@ public QTestUtil(String outDir, String logDir, boolean miniMr, String hadoopVer) if (miniMr) { dfs = ShimLoader.getHadoopShims().getMiniDfs(conf, 4, true, null); FileSystem fs = dfs.getFileSystem(); - mr = new MiniMRCluster(4, getHdfsUriString(fs.getUri().toString()), 1); + mr = ShimLoader.getHadoopShims().getMiniMrCluster(conf, 4, getHdfsUriString(fs.getUri().toString()), 1); } initConf(); @@ -374,7 +356,7 @@ public void addFile(File qf) throws Exception { private boolean checkNeedsSort(String fileName, String query) { Pattern pattern = Pattern.compile("-- SORT_BEFORE_DIFF"); Matcher matcher = pattern.matcher(query); - + if (matcher.find()) { return true; } @@ -840,7 +822,7 @@ public int checkNegativeResults(String tname, Exception e) throws Exception { outfd.write(e.getMessage()); outfd.close(); - int exitVal = executeDiffCommand(outf.getPath(), expf, false, + int exitVal = executeDiffCommand(outf.getPath(), expf, false, qSortSet.contains(qf.getName())); if (exitVal != 0 && overWrite) { exitVal = overwriteResults(outf.getPath(), expf); @@ -1061,7 +1043,7 @@ private static int executeDiffCommand(String inFileName, ) throws Exception { int result = 0; - + if (sortResults) { // sort will try to open the output file in write mode on windows. We need to // close it first. @@ -1140,18 +1122,18 @@ private static int executeCmd(Collection args, String outFile, String er private static int executeCmd(String[] args, String outFile, String errFile) throws Exception { System.out.println("Running: " + org.apache.commons.lang.StringUtils.join(args, ' ')); - PrintStream out = outFile == null ? - SessionState.getConsole().getChildOutStream() : + PrintStream out = outFile == null ? + SessionState.getConsole().getChildOutStream() : new PrintStream(new FileOutputStream(outFile), true); - PrintStream err = errFile == null ? - SessionState.getConsole().getChildErrStream() : + PrintStream err = errFile == null ? + SessionState.getConsole().getChildErrStream() : new PrintStream(new FileOutputStream(errFile), true); Process executor = Runtime.getRuntime().exec(args); StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, err); StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, out); - + outPrinter.start(); errPrinter.start(); diff --git shims/ivy.xml shims/ivy.xml index a18634b..f733d64 100644 --- shims/ivy.xml +++ shims/ivy.xml @@ -91,6 +91,36 @@ + + + + + + + + + + + + + + + + + + + + + + + diff --git shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java index 84ce3d8..c9baa7f 100644 --- shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java +++ shims/src/0.20/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java @@ -52,6 +52,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.OutputCommitter; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; @@ -100,6 +101,43 @@ public void setTmpFiles(String prop, String files) { // gone in 20+ } + + /** + * Returns a shim to wrap MiniMrCluster + */ + public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers, + String nameNode, int numDir) throws IOException { + return new MiniMrShim(conf, numberOfTaskTrackers, nameNode, numDir); + } + + /** + * Shim for MiniMrCluster + */ + public class MiniMrShim implements HadoopShims.MiniMrShim { + + private final MiniMRCluster mr; + + public MiniMrShim(Configuration conf, int numberOfTaskTrackers, + String nameNode, int numDir) throws IOException { + this.mr = new MiniMRCluster(numberOfTaskTrackers, nameNode, numDir); + } + + @Override + public int getJobTrackerPort() throws UnsupportedOperationException { + return mr.getJobTrackerPort(); + } + + @Override + public void shutdown() throws IOException { + mr.shutdown(); + } + + @Override + public void setupConfiguration(Configuration conf) { + setJobLauncherRpcAddress(conf, "localhost:" + mr.getJobTrackerPort()); + } + } + public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf, int numDataNodes, boolean format, diff --git shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index b7515a5..e4a632d 100644 --- shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ shims/src/0.20S/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -26,6 +26,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Trash; import org.apache.hadoop.hive.shims.HadoopShimsSecure; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TaskLogServlet; @@ -119,4 +121,71 @@ public long getDefaultBlockSize(FileSystem fs, Path path) { public short getDefaultReplication(FileSystem fs, Path path) { return fs.getDefaultReplication(); } + + /** + * Returns a shim to wrap MiniMrCluster + */ + public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers, + String nameNode, int numDir) throws IOException { + return new MiniMrShim(conf, numberOfTaskTrackers, nameNode, numDir); + } + + /** + * Shim for MiniMrCluster + */ + public class MiniMrShim implements HadoopShims.MiniMrShim { + + private final MiniMRCluster mr; + + public MiniMrShim(Configuration conf, int numberOfTaskTrackers, + String nameNode, int numDir) throws IOException { + this.mr = new MiniMRCluster(numberOfTaskTrackers, nameNode, numDir); + } + + @Override + public int getJobTrackerPort() throws UnsupportedOperationException { + return mr.getJobTrackerPort(); + } + + @Override + public void shutdown() throws IOException { + mr.shutdown(); + } + + @Override + public void setupConfiguration(Configuration conf) { + setJobLauncherRpcAddress(conf, "localhost:" + mr.getJobTrackerPort()); + } + } + + // Don't move this code to the parent class. There's a binary + // incompatibility between hadoop 1 and 2 wrt MiniDFSCluster and we + // need to have two different shim classes even though they are + // exactly the same. + public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf, + int numDataNodes, + boolean format, + String[] racks) throws IOException { + return new MiniDFSShim(new MiniDFSCluster(conf, numDataNodes, format, racks)); + } + + /** + * MiniDFSShim. + * + */ + public class MiniDFSShim implements HadoopShims.MiniDFSShim { + private final MiniDFSCluster cluster; + + public MiniDFSShim(MiniDFSCluster cluster) { + this.cluster = cluster; + } + + public FileSystem getFileSystem() throws IOException { + return cluster.getFileSystem(); + } + + public void shutdown() { + cluster.shutdown(); + } + } } diff --git shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 9a22355..1975385 100644 --- shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ shims/src/0.23/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -21,13 +21,17 @@ import java.lang.Integer; import java.net.MalformedURLException; import java.net.URL; +import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Trash; import org.apache.hadoop.hive.shims.HadoopShims.JobTrackerState; import org.apache.hadoop.hive.shims.HadoopShimsSecure; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; @@ -132,4 +136,91 @@ public boolean moveToAppropriateTrash(FileSystem fs, Path path, Configuration co throws IOException { return Trash.moveToAppropriateTrash(fs, path, conf); } + + /** + * Returns a shim to wrap MiniMrCluster + */ + public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers, + String nameNode, int numDir) throws IOException { + return new MiniMrShim(conf, numberOfTaskTrackers, nameNode, numDir); + } + + /** + * Shim for MiniMrCluster + */ + public class MiniMrShim implements HadoopShims.MiniMrShim { + + private final MiniMRCluster mr; + private final Configuration conf; + + public MiniMrShim(Configuration conf, int numberOfTaskTrackers, + String nameNode, int numDir) throws IOException { + this.conf = conf; + + JobConf jConf = new JobConf(conf); + jConf.set("yarn.scheduler.capacity.root.queues", "default"); + jConf.set("yarn.scheduler.capacity.root.default.capacity", "100"); + + mr = new MiniMRCluster(numberOfTaskTrackers, nameNode, numDir, null, null, jConf); + } + + @Override + public int getJobTrackerPort() throws UnsupportedOperationException { + String address = conf.get("yarn.resourcemanager.address"); + address = StringUtils.substringAfterLast(address, ":"); + + if (StringUtils.isBlank(address)) { + throw new IllegalArgumentException("Invalid YARN resource manager port."); + } + + return Integer.parseInt(address); + } + + @Override + public void shutdown() throws IOException { + mr.shutdown(); + } + + @Override + public void setupConfiguration(Configuration conf) { + JobConf jConf = mr.createJobConf(); + for (Map.Entry pair: jConf) { + //System.out.println("XXX Var: "+pair.getKey() +"="+pair.getValue()); + //if (conf.get(pair.getKey()) == null) { + conf.set(pair.getKey(), pair.getValue()); + //} + } + } + } + + // Don't move this code to the parent class. There's a binary + // incompatibility between hadoop 1 and 2 wrt MiniDFSCluster and we + // need to have two different shim classes even though they are + // exactly the same. + public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf, + int numDataNodes, + boolean format, + String[] racks) throws IOException { + return new MiniDFSShim(new MiniDFSCluster(conf, numDataNodes, format, racks)); + } + + /** + * MiniDFSShim. + * + */ + public class MiniDFSShim implements HadoopShims.MiniDFSShim { + private final MiniDFSCluster cluster; + + public MiniDFSShim(MiniDFSCluster cluster) { + this.cluster = cluster; + } + + public FileSystem getFileSystem() throws IOException { + return cluster.getFileSystem(); + } + + public void shutdown() { + cluster.shutdown(); + } + } } diff --git shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java index 1d1bde6..9edcafd 100644 --- shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java +++ shims/src/common-secure/java/org/apache/hadoop/hive/shims/HadoopShimsSecure.java @@ -37,7 +37,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; import org.apache.hadoop.hive.thrift.DelegationTokenSelector; @@ -106,33 +105,6 @@ public void setTmpFiles(String prop, String files) { // gone in 20+ } - public HadoopShims.MiniDFSShim getMiniDfs(Configuration conf, - int numDataNodes, - boolean format, - String[] racks) throws IOException { - return new MiniDFSShim(new MiniDFSCluster(conf, numDataNodes, format, racks)); - } - - /** - * MiniDFSShim. - * - */ - public class MiniDFSShim implements HadoopShims.MiniDFSShim { - private final MiniDFSCluster cluster; - - public MiniDFSShim(MiniDFSCluster cluster) { - this.cluster = cluster; - } - - public FileSystem getFileSystem() throws IOException { - return cluster.getFileSystem(); - } - - public void shutdown() { - cluster.shutdown(); - } - } - /** * We define this function here to make the code compatible between * hadoop 0.17 and hadoop 0.20. diff --git shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java index 946e1c1..b0f5077 100644 --- shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java +++ shims/src/common/java/org/apache/hadoop/hive/shims/HadoopShims.java @@ -113,6 +113,21 @@ String getTaskAttemptLogUrl(JobConf conf, long getAccessTime(FileStatus file); /** + * Returns a shim to wrap MiniMrCluster + */ + public MiniMrShim getMiniMrCluster(Configuration conf, int numberOfTaskTrackers, + String nameNode, int numDir) throws IOException; + + /** + * Shim for MiniMrCluster + */ + public interface MiniMrShim { + public int getJobTrackerPort() throws UnsupportedOperationException; + public void shutdown() throws IOException; + public void setupConfiguration(Configuration conf); + } + + /** * Returns a shim to wrap MiniDFSCluster. This is necessary since this class * was moved from org.apache.hadoop.dfs to org.apache.hadoop.hdfs */