Index: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
===================================================================
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (revision 1096548)
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (working copy)
@@ -218,7 +218,6 @@
// should we do checks against the storage (usually hdfs) for operations like drop_partition
METASTORE_AUTHORIZATION_STORAGE_AUTH_CHECKS("hive.metastore.authorization.storage.checks", false),
-
// Default parameters for creating tables
NEWTABLEDEFAULTPARA("hive.table.parameters.default",""),
@@ -430,9 +429,9 @@
// deployment. It has not been documented in hive-default.xml intentionally, this should be removed
// once the feature is stable
HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS("hive.mapper.cannot.span.multiple.partitions", false),
+ HIVE_REWORK_MAPREDWORK("hive.rework.mapredwork", false),
;
-
public final String varname;
public final String defaultVal;
public final int defaultIntVal;
Index: conf/hive-default.xml
===================================================================
--- conf/hive-default.xml (revision 1096548)
+++ conf/hive-default.xml (working copy)
@@ -1038,4 +1038,11 @@
Insert queries are not restricted by this limit.
+
+ hive.rework.mapredwork
+ false
+ should rework the mapred work or not.
+ This is first introduced by SymlinkTextInputFormat to replace symlink files with real paths at compile time.
+
+
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (revision 1096548)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (working copy)
@@ -721,7 +721,7 @@
/**
* Handle a empty/null path for a given alias.
*/
- private int addInputPath(String path, JobConf job, MapredWork work, String hiveScratchDir,
+ private static int addInputPath(String path, JobConf job, MapredWork work, String hiveScratchDir,
int numEmptyPaths, boolean isEmptyPath, String alias) throws Exception {
// either the directory does not exist or it is empty
assert path == null || isEmptyPath;
@@ -794,7 +794,7 @@
return numEmptyPaths;
}
- private void addInputPaths(JobConf job, MapredWork work, String hiveScratchDir, Context ctx)
+ public static void addInputPaths(JobConf job, MapredWork work, String hiveScratchDir, Context ctx)
throws Exception {
int numEmptyPaths = 0;
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (revision 1096548)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (working copy)
@@ -53,7 +53,7 @@
protected transient boolean queued;
protected transient HiveConf conf;
protected transient Hive db;
- protected transient Log LOG;
+ protected static transient Log LOG;
protected transient LogHelper console;
protected transient QueryPlan queryPlan;
protected transient TaskHandle taskHandle;
Index: ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (revision 1096548)
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (working copy)
@@ -93,6 +93,7 @@
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -2002,4 +2003,47 @@
sb.append(">");
_log.info(sb);
}
+
+ /**
+ * The check here is kind of not clean. It first use a for loop to go through
+ * all input formats, and choose the ones that extend ReworkMapredInputFormat
+ * to a set. And finally go through the ReworkMapredInputFormat set, and call
+ * rework for each one.
+ *
+ * Technically all these can be avoided if all Hive's input formats can share
+ * a same interface. As in today's hive and Hadoop, it is not possible because
+ * a lot of Hive's input formats are in Hadoop's code. And most of Hadoop's
+ * input formats just extend InputFormat interface.
+ *
+ * @param task
+ * @param reworkMapredWork
+ * @param conf
+ * @throws SemanticException
+ */
+ public static void reworkMapRedWork(Task extends Serializable> task,
+ boolean reworkMapredWork, HiveConf conf) throws SemanticException {
+ if (reworkMapredWork && (task instanceof MapRedTask)) {
+ try {
+ MapredWork mapredWork = ((MapRedTask) task).getWork();
+ Set> reworkInputFormats = new HashSet>();
+ for (PartitionDesc part : mapredWork.getPathToPartitionInfo().values()) {
+ Class extends InputFormat> inputFormatCls = part
+ .getInputFileFormatClass();
+ if (ReworkMapredInputFormat.class.isAssignableFrom(inputFormatCls)) {
+ reworkInputFormats.add(inputFormatCls);
+ }
+ }
+
+ if (reworkInputFormats.size() > 0) {
+ for (Class extends InputFormat> inputFormatCls : reworkInputFormats) {
+ ReworkMapredInputFormat inst = (ReworkMapredInputFormat) ReflectionUtils
+ .newInstance(inputFormatCls, null);
+ inst.rework(conf, mapredWork);
+ }
+ }
+ } catch (IOException e) {
+ throw new SemanticException(e);
+ }
+ }
+ }
}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/ReworkMapredInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/ReworkMapredInputFormat.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/ReworkMapredInputFormat.java (revision 0)
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+
+public interface ReworkMapredInputFormat {
+ public void rework(HiveConf job, MapredWork work) throws IOException;
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java (revision 0)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java (revision 0)
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.mapred.TextInputFormat;
+
+public class SymbolicInputFormat implements ReworkMapredInputFormat {
+
+ public void rework(HiveConf job, MapredWork work) throws IOException {
+ Map pathToParts = work.getPathToPartitionInfo();
+ List toRemovePaths = new ArrayList();
+ Map toAddPathToPart = new HashMap();
+ Map> pathToAliases = work.getPathToAliases();
+
+ for (Map.Entry pathPartEntry : pathToParts
+ .entrySet()) {
+ String path = pathPartEntry.getKey();
+ PartitionDesc partDesc = pathPartEntry.getValue();
+ // this path points to a symlink path
+ if (partDesc.getInputFileFormatClass().equals(
+ SymlinkTextInputFormat.class)) {
+ // change to TextInputFormat
+ partDesc.setInputFileFormatClass(TextInputFormat.class);
+ Path symlinkDir = new Path(path);
+ FileSystem fileSystem = symlinkDir.getFileSystem(job);
+ FileStatus fStatus = fileSystem.getFileStatus(symlinkDir);
+ FileStatus[] symlinks = null;
+ if (!fStatus.isDir()) {
+ symlinks = new FileStatus[] { fStatus };
+ } else {
+ symlinks = fileSystem.listStatus(symlinkDir);
+ }
+ toRemovePaths.add(path);
+ ArrayList aliases = pathToAliases.remove(path);
+ for (FileStatus symlink : symlinks) {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(
+ fileSystem.open(symlink.getPath())));
+
+ partDesc.setInputFileFormatClass(TextInputFormat.class);
+
+ String line;
+ while ((line = reader.readLine()) != null) {
+ // no check for the line? How to check?
+ // if the line is invalid for any reason, the job will fail.
+ toAddPathToPart.put(line, partDesc);
+ pathToAliases.put(line, aliases);
+ }
+ }
+ }
+ }
+
+ pathToParts.putAll(toAddPathToPart);
+ for (String toRemove : toRemovePaths) {
+ pathToParts.remove(toRemove);
+ }
+ }
+}
Index: ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java (revision 1096548)
+++ ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java (working copy)
@@ -23,13 +23,18 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -49,8 +54,9 @@
* actual map-reduce input. The target input data should be in TextInputFormat.
*/
@SuppressWarnings("deprecation")
-public class SymlinkTextInputFormat
- implements InputFormat, JobConfigurable, ContentSummaryInputFormat {
+public class SymlinkTextInputFormat extends SymbolicInputFormat implements
+ InputFormat, JobConfigurable,
+ ContentSummaryInputFormat, ReworkMapredInputFormat {
/**
* This input split wraps the FileSplit generated from
* TextInputFormat.getSplits(), while setting the original link file path
@@ -226,4 +232,5 @@
}
return new ContentSummary(summary[0], summary[1], summary[2]);
}
+
}
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java (revision 1096548)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/ExplainSemanticAnalyzer.java (working copy)
@@ -46,6 +46,7 @@
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, (ASTNode) ast
.getChild(0));
sem.analyze((ASTNode) ast.getChild(0), ctx);
+ sem.validate();
boolean extended = false;
if (ast.getChildCount() > 1) {
Index: ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
===================================================================
--- ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (revision 1096548)
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (working copy)
@@ -72,6 +72,7 @@
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -164,6 +165,7 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* Implementation of the semantic analyzer.
@@ -7135,21 +7137,24 @@
"Table " + tbl.getTableName()));
}
}
+
+ boolean reworkMapredWork = HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.HIVE_REWORK_MAPREDWORK);
// validate all tasks
for (Task extends Serializable> rootTask : rootTasks) {
- validate(rootTask);
+ validate(rootTask, reworkMapredWork);
}
}
- private void validate(Task extends Serializable> task)
+ private void validate(Task extends Serializable> task, boolean reworkMapredWork)
throws SemanticException {
+ Utilities.reworkMapRedWork(task, reworkMapredWork, conf);
if (task.getChildTasks() == null) {
return;
}
for (Task extends Serializable> childTask : task.getChildTasks()) {
- validate(childTask);
+ validate(childTask, reworkMapredWork);
}
}
Index: ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java
===================================================================
--- ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java (revision 1096548)
+++ ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java (working copy)
@@ -17,8 +17,12 @@
*/
package org.apache.hadoop.hive.ql.io;
+import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
@@ -30,6 +34,22 @@
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.MapRedTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.parse.ParseDriver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -37,6 +57,8 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* Unittest for SymlinkTextInputFormat.
@@ -61,8 +83,9 @@
conf = new Configuration();
job = new JobConf(conf);
fileSystem = FileSystem.getLocal(conf);
- testDir = new Path(System.getProperty("test.data.dir", ".") +
- "/TestSymlinkTextInputFormat");
+ testDir = new Path(System.getProperty("test.data.dir", System.getProperty(
+ "user.dir", new File(".").getAbsolutePath()))
+ + "/TestSymlinkTextInputFormat");
reporter = Reporter.NULL;
fileSystem.delete(testDir, true);
@@ -77,6 +100,101 @@
}
/**
+ * Test combine symlink text input file. Two input dir, and each contails one
+ * file, and then create one symlink file containing these 2 files. Normally
+ * without combine, it will return at least 2 splits
+ */
+ public void testCombine() throws Exception {
+ JobConf newJob = new JobConf(job);
+ FileSystem fs = dataDir1.getFileSystem(newJob);
+ int symbolLinkedFileSize = 0;
+
+ Path dir1_file1 = new Path(dataDir1, "combinefile1_1");
+ writeTextFile(dir1_file1,
+ "dir1_file1_line1\n" +
+ "dir1_file1_line2\n");
+
+ symbolLinkedFileSize += fs.getFileStatus(dir1_file1).getLen();
+
+ Path dir2_file1 = new Path(dataDir2, "combinefile2_1");
+ writeTextFile(dir2_file1,
+ "dir2_file1_line1\n" +
+ "dir2_file1_line2\n");
+
+ symbolLinkedFileSize += fs.getFileStatus(dir2_file1).getLen();
+
+ // A symlink file, contains first file from first dir and second file from
+ // second dir.
+ writeSymlinkFile(
+ new Path(symlinkDir, "symlink_file"),
+ new Path(dataDir1, "combinefile1_1"),
+ new Path(dataDir2, "combinefile2_1"));
+
+
+ HiveConf hiveConf = new HiveConf(TestSymlinkTextInputFormat.class);
+
+ HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_REWORK_MAPREDWORK, true);
+ HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ Driver drv = new Driver(hiveConf);
+ drv.init();
+ String tblName = "text_symlink_text";
+
+ String createSymlinkTableCmd = "create table " + tblName + " (key int) stored as " +
+ " inputformat 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' " +
+ " outputformat 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'";
+
+ SessionState.start(hiveConf);
+
+ boolean tblCreated = false;
+ try {
+ int ecode = 0;
+ ecode = drv.run(createSymlinkTableCmd).getResponseCode();
+ if (ecode != 0) {
+ throw new Exception("Create table command: " + createSymlinkTableCmd
+ + " failed with exit code= " + ecode);
+ }
+
+ String loadFileCommand = "LOAD DATA LOCAL INPATH '" +
+ new Path(symlinkDir, "symlink_file").toString() + "' INTO TABLE " + tblName;
+
+ ecode = drv.run(loadFileCommand).getResponseCode();
+ if (ecode != 0) {
+ throw new Exception("Load data command: " + loadFileCommand
+ + " failed with exit code= " + ecode);
+ }
+
+ String cmd = "select key from " + tblName;
+ drv.compile(cmd);
+
+ //create scratch dir
+ String emptyScratchDirStr;
+ Path emptyScratchDir;
+ Context ctx = new Context(newJob);
+ emptyScratchDirStr = ctx.getMRTmpFileURI();
+ emptyScratchDir = new Path(emptyScratchDirStr);
+ FileSystem fileSys = emptyScratchDir.getFileSystem(newJob);
+ fileSys.mkdirs(emptyScratchDir);
+
+ QueryPlan plan = drv.getPlan();
+ MapRedTask selectTask = (MapRedTask)plan.getRootTasks().get(0);
+
+ ExecDriver.addInputPaths(newJob, selectTask.getWork(), emptyScratchDir.toString(), ctx);
+ Utilities.setMapRedWork(newJob, selectTask.getWork(), ctx.getMRTmpFileURI());
+
+ CombineHiveInputFormat combineInputFormat = ReflectionUtils.newInstance(
+ CombineHiveInputFormat.class, newJob);
+ InputSplit[] retSplits = combineInputFormat.getSplits(newJob, 1);
+ assertEquals(1, retSplits.length);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (tblCreated) {
+ drv.run("drop table text_symlink_text;").getResponseCode();
+ }
+ }
+ }
+
+ /**
* Test scenario: Two data directories, one symlink file that contains two
* paths each point to a file in one of data directories.
*/