Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java =================================================================== --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1096548) +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy) @@ -218,7 +218,6 @@ // should we do checks against the storage (usually hdfs) for operations like drop_partition METASTORE_AUTHORIZATION_STORAGE_AUTH_CHECKS("hive.metastore.authorization.storage.checks", false), - // Default parameters for creating tables NEWTABLEDEFAULTPARA("hive.table.parameters.default",""), @@ -430,9 +429,9 @@ // deployment. It has not been documented in hive-default.xml intentionally, this should be removed // once the feature is stable HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false), + HIVE_REWORK_MAPREDWORK("hive.rework.mapredwork", false), ; - public final String varname; public final String defaultVal; public final int defaultIntVal; Index: conf/hive-default.xml =================================================================== --- conf/hive-default.xml (revision 1096548) +++ conf/hive-default.xml (working copy) @@ -1038,4 +1038,11 @@ Insert queries are not restricted by this limit. + + hive.rework.mapredwork + false + should rework the mapred work or not. + This is first introduced by SymlinkTextInputFormat to replace symlink files with real paths at compile time. + + Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1096548) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy) @@ -721,7 +721,7 @@ /** * Handle a empty/null path for a given alias. */ - private int addInputPath(String path, JobConf job, MapredWork work, String hiveScratchDir, + private static int addInputPath(String path, JobConf job, MapredWork work, String hiveScratchDir, int numEmptyPaths, boolean isEmptyPath, String alias) throws Exception { // either the directory does not exist or it is empty assert path == null || isEmptyPath; @@ -794,7 +794,7 @@ return numEmptyPaths; } - private void addInputPaths(JobConf job, MapredWork work, String hiveScratchDir, Context ctx) + public static void addInputPaths(JobConf job, MapredWork work, String hiveScratchDir, Context ctx) throws Exception { int numEmptyPaths = 0; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (revision 1096548) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (working copy) @@ -53,7 +53,7 @@ protected transient boolean queued; protected transient HiveConf conf; protected transient Hive db; - protected transient Log LOG; + protected static transient Log LOG; protected transient LogHelper console; protected transient QueryPlan queryPlan; protected transient TaskHandle taskHandle; Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1096548) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy) @@ -93,6 +93,7 @@ import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; import org.apache.hadoop.hive.ql.io.RCFile; +import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; @@ -2002,4 +2003,47 @@ sb.append(">"); _log.info(sb); } + + /** + * The check here is kind of not clean. It first use a for loop to go through + * all input formats, and choose the ones that extend ReworkMapredInputFormat + * to a set. And finally go through the ReworkMapredInputFormat set, and call + * rework for each one. + * + * Technically all these can be avoided if all Hive's input formats can share + * a same interface. As in today's hive and Hadoop, it is not possible because + * a lot of Hive's input formats are in Hadoop's code. And most of Hadoop's + * input formats just extend InputFormat interface. + * + * @param task + * @param reworkMapredWork + * @param conf + * @throws SemanticException + */ + public static void reworkMapRedWork(Task task, + boolean reworkMapredWork, HiveConf conf) throws SemanticException { + if (reworkMapredWork && (task instanceof MapRedTask)) { + try { + MapredWork mapredWork = ((MapRedTask) task).getWork(); + Set> reworkInputFormats = new HashSet>(); + for (PartitionDesc part : mapredWork.getPathToPartitionInfo().values()) { + Class inputFormatCls = part + .getInputFileFormatClass(); + if (ReworkMapredInputFormat.class.isAssignableFrom(inputFormatCls)) { + reworkInputFormats.add(inputFormatCls); + } + } + + if (reworkInputFormats.size() > 0) { + for (Class inputFormatCls : reworkInputFormats) { + ReworkMapredInputFormat inst = (ReworkMapredInputFormat) ReflectionUtils + .newInstance(inputFormatCls, null); + inst.rework(conf, mapredWork); + } + } + } catch (IOException e) { + throw new SemanticException(e); + } + } + } } Index: ql/src/java/org/apache/hadoop/hive/ql/io/ReworkMapredInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/ReworkMapredInputFormat.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/ReworkMapredInputFormat.java (revision 0) @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.IOException; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.plan.MapredWork; + +public interface ReworkMapredInputFormat { + public void rework(HiveConf job, MapredWork work) throws IOException; +} Index: ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java (revision 0) +++ ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java (revision 0) @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.mapred.TextInputFormat; + +public class SymbolicInputFormat implements ReworkMapredInputFormat { + + public void rework(HiveConf job, MapredWork work) throws IOException { + Map pathToParts = work.getPathToPartitionInfo(); + List toRemovePaths = new ArrayList(); + Map toAddPathToPart = new HashMap(); + Map> pathToAliases = work.getPathToAliases(); + + for (Map.Entry pathPartEntry : pathToParts + .entrySet()) { + String path = pathPartEntry.getKey(); + PartitionDesc partDesc = pathPartEntry.getValue(); + // this path points to a symlink path + if (partDesc.getInputFileFormatClass().equals( + SymlinkTextInputFormat.class)) { + // change to TextInputFormat + partDesc.setInputFileFormatClass(TextInputFormat.class); + Path symlinkDir = new Path(path); + FileSystem fileSystem = symlinkDir.getFileSystem(job); + FileStatus fStatus = fileSystem.getFileStatus(symlinkDir); + FileStatus[] symlinks = null; + if (!fStatus.isDir()) { + symlinks = new FileStatus[] { fStatus }; + } else { + symlinks = fileSystem.listStatus(symlinkDir); + } + toRemovePaths.add(path); + ArrayList aliases = pathToAliases.remove(path); + for (FileStatus symlink : symlinks) { + BufferedReader reader = new BufferedReader(new InputStreamReader( + fileSystem.open(symlink.getPath()))); + + partDesc.setInputFileFormatClass(TextInputFormat.class); + + String line; + while ((line = reader.readLine()) != null) { + // no check for the line? How to check? + // if the line is invalid for any reason, the job will fail. + toAddPathToPart.put(line, partDesc); + pathToAliases.put(line, aliases); + } + } + } + } + + pathToParts.putAll(toAddPathToPart); + for (String toRemove : toRemovePaths) { + pathToParts.remove(toRemove); + } + } +} Index: ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java (revision 1096548) +++ ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java (working copy) @@ -23,13 +23,18 @@ import java.io.IOException; import java.io.InputStreamReader; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; @@ -49,8 +54,9 @@ * actual map-reduce input. The target input data should be in TextInputFormat. */ @SuppressWarnings("deprecation") -public class SymlinkTextInputFormat - implements InputFormat, JobConfigurable, ContentSummaryInputFormat { +public class SymlinkTextInputFormat extends SymbolicInputFormat implements + InputFormat, JobConfigurable, + ContentSummaryInputFormat, ReworkMapredInputFormat { /** * This input split wraps the FileSplit generated from * TextInputFormat.getSplits(), while setting the original link file path @@ -226,4 +232,5 @@ } return new ContentSummary(summary[0], summary[1], summary[2]); } + } Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java (revision 1096548) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java (working copy) @@ -46,6 +46,7 @@ BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, (ASTNode) ast .getChild(0)); sem.analyze((ASTNode) ast.getChild(0), ctx); + sem.validate(); boolean extended = false; if (ast.getChildCount() > 1) { Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1096548) +++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy) @@ -72,6 +72,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; @@ -164,6 +165,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.util.ReflectionUtils; /** * Implementation of the semantic analyzer. @@ -7135,21 +7137,24 @@ "Table " + tbl.getTableName())); } } + + boolean reworkMapredWork = HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_REWORK_MAPREDWORK); // validate all tasks for (Task rootTask : rootTasks) { - validate(rootTask); + validate(rootTask, reworkMapredWork); } } - private void validate(Task task) + private void validate(Task task, boolean reworkMapredWork) throws SemanticException { + Utilities.reworkMapRedWork(task, reworkMapredWork, conf); if (task.getChildTasks() == null) { return; } for (Task childTask : task.getChildTasks()) { - validate(childTask); + validate(childTask, reworkMapredWork); } } Index: ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java =================================================================== --- ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java (revision 1096548) +++ ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java (working copy) @@ -17,8 +17,12 @@ */ package org.apache.hadoop.hive.ql.io; +import java.io.File; import java.io.IOException; import java.io.OutputStreamWriter; +import java.io.Serializable; +import java.net.URL; +import java.net.URLClassLoader; import java.util.ArrayList; import java.util.List; @@ -30,6 +34,22 @@ import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStore; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.ExecDriver; +import org.apache.hadoop.hive.ql.exec.MapRedTask; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.parse.ParseDriver; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; @@ -37,6 +57,8 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.util.ReflectionUtils; /** * Unittest for SymlinkTextInputFormat. @@ -61,8 +83,9 @@ conf = new Configuration(); job = new JobConf(conf); fileSystem = FileSystem.getLocal(conf); - testDir = new Path(System.getProperty("test.data.dir", ".") + - "/TestSymlinkTextInputFormat"); + testDir = new Path(System.getProperty("test.data.dir", System.getProperty( + "user.dir", new File(".").getAbsolutePath())) + + "/TestSymlinkTextInputFormat"); reporter = Reporter.NULL; fileSystem.delete(testDir, true); @@ -77,6 +100,101 @@ } /** + * Test combine symlink text input file. Two input dir, and each contails one + * file, and then create one symlink file containing these 2 files. Normally + * without combine, it will return at least 2 splits + */ + public void testCombine() throws Exception { + JobConf newJob = new JobConf(job); + FileSystem fs = dataDir1.getFileSystem(newJob); + int symbolLinkedFileSize = 0; + + Path dir1_file1 = new Path(dataDir1, "combinefile1_1"); + writeTextFile(dir1_file1, + "dir1_file1_line1\n" + + "dir1_file1_line2\n"); + + symbolLinkedFileSize += fs.getFileStatus(dir1_file1).getLen(); + + Path dir2_file1 = new Path(dataDir2, "combinefile2_1"); + writeTextFile(dir2_file1, + "dir2_file1_line1\n" + + "dir2_file1_line2\n"); + + symbolLinkedFileSize += fs.getFileStatus(dir2_file1).getLen(); + + // A symlink file, contains first file from first dir and second file from + // second dir. + writeSymlinkFile( + new Path(symlinkDir, "symlink_file"), + new Path(dataDir1, "combinefile1_1"), + new Path(dataDir2, "combinefile2_1")); + + + HiveConf hiveConf = new HiveConf(TestSymlinkTextInputFormat.class); + + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_REWORK_MAPREDWORK, true); + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + Driver drv = new Driver(hiveConf); + drv.init(); + String tblName = "text_symlink_text"; + + String createSymlinkTableCmd = "create table " + tblName + " (key int) stored as " + + " inputformat 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' " + + " outputformat 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'"; + + SessionState.start(hiveConf); + + boolean tblCreated = false; + try { + int ecode = 0; + ecode = drv.run(createSymlinkTableCmd).getResponseCode(); + if (ecode != 0) { + throw new Exception("Create table command: " + createSymlinkTableCmd + + " failed with exit code= " + ecode); + } + + String loadFileCommand = "LOAD DATA LOCAL INPATH '" + + new Path(symlinkDir, "symlink_file").toString() + "' INTO TABLE " + tblName; + + ecode = drv.run(loadFileCommand).getResponseCode(); + if (ecode != 0) { + throw new Exception("Load data command: " + loadFileCommand + + " failed with exit code= " + ecode); + } + + String cmd = "select key from " + tblName; + drv.compile(cmd); + + //create scratch dir + String emptyScratchDirStr; + Path emptyScratchDir; + Context ctx = new Context(newJob); + emptyScratchDirStr = ctx.getMRTmpFileURI(); + emptyScratchDir = new Path(emptyScratchDirStr); + FileSystem fileSys = emptyScratchDir.getFileSystem(newJob); + fileSys.mkdirs(emptyScratchDir); + + QueryPlan plan = drv.getPlan(); + MapRedTask selectTask = (MapRedTask)plan.getRootTasks().get(0); + + ExecDriver.addInputPaths(newJob, selectTask.getWork(), emptyScratchDir.toString(), ctx); + Utilities.setMapRedWork(newJob, selectTask.getWork(), ctx.getMRTmpFileURI()); + + CombineHiveInputFormat combineInputFormat = ReflectionUtils.newInstance( + CombineHiveInputFormat.class, newJob); + InputSplit[] retSplits = combineInputFormat.getSplits(newJob, 1); + assertEquals(1, retSplits.length); + } catch (Exception e) { + e.printStackTrace(); + } finally { + if (tblCreated) { + drv.run("drop table text_symlink_text;").getResponseCode(); + } + } + } + + /** * Test scenario: Two data directories, one symlink file that contains two * paths each point to a file in one of data directories. */