diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 4e5df4c..292b1f0 100644
--- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -766,6 +766,9 @@
HIVEOUTERJOINSUPPORTSFILTERS("hive.outerjoin.supports.filters", true),
+ // 0 for disable
+ HIVEPARALLELORDERBYBUCKETINGNUM("hive.parallel.orderby.bucketing.num", 0),
+
// 'minimal', 'more' (and 'all' later)
HIVEFETCHTASKCONVERSION("hive.fetch.task.conversion", "minimal",
new StringsValidator("minimal", "more")),
diff --git conf/hive-default.xml.template conf/hive-default.xml.template
index 7b53239..9f6b30f 100644
--- conf/hive-default.xml.template
+++ conf/hive-default.xml.template
@@ -1989,6 +1989,18 @@
+ hive.parallel.orderby.bucketing.num
+ 0
+
+ Generally in hive, order by clause is executed by single reducer. But if the query is just for patching
+ not for inserting into table or partition, fully ordered stream could be provided in fetch task by
+ merging partially sorted streams from multiple reducers.
+ This configuration means the number of reducers for the last MapReduce task for order by.
+ -1 means it will be decided by usual calculation(based on hive.exec.reducers.bytes.per.reducer). 0 disables this.
+
+
+
+
hive.fetch.task.conversion
minimal
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 6daf199..f6b0491 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -34,10 +34,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
-import org.apache.hadoop.hive.ql.exec.FooterBuffer;
import org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveRecordReader;
@@ -48,7 +46,6 @@
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.DelegatedObjectInspectorFactory;
@@ -59,11 +56,8 @@
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveTypeEntry;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
@@ -76,7 +70,7 @@
/**
* FetchTask implementation.
**/
-public class FetchOperator implements Serializable {
+public class FetchOperator implements RowFetcher, Serializable {
static Log LOG = LogFactory.getLog(FetchOperator.class.getName());
static LogHelper console = new LogHelper(LOG);
@@ -330,7 +324,7 @@ private void getNextPath() throws Exception {
if (isNativeTable) {
FileSystem fs = currPath.getFileSystem(job);
if (fs.exists(currPath)) {
- FileStatus[] fStats = listStatusUnderPath(fs, currPath);
+ FileStatus[] fStats = listStatusUnderPath(fs, currPath, job);
for (FileStatus fStat : fStats) {
if (fStat.getLen() > 0) {
tblDataDone = true;
@@ -365,7 +359,7 @@ private void getNextPath() throws Exception {
}
FileSystem fs = nxt.getFileSystem(job);
if (fs.exists(nxt)) {
- FileStatus[] fStats = listStatusUnderPath(fs, nxt);
+ FileStatus[] fStats = listStatusUnderPath(fs, nxt, job);
for (FileStatus fStat : fStats) {
if (fStat.getLen() > 0) {
currPath = nxt;
@@ -540,6 +534,14 @@ public boolean pushRow() throws IOException, HiveException {
return row != null;
}
+ public ObjectInspector setupFetchContext() throws HiveException {
+ return getOutputObjectInspector();
+ }
+
+ public InspectableObject fetchRow() throws IOException {
+ return getNextRow();
+ }
+
protected void pushRow(InspectableObject row) throws HiveException {
operator.processOp(row.o, 0);
}
@@ -742,8 +744,10 @@ public ObjectInspector getOutputObjectInspector() throws HiveException {
*
* @return list of file status entries
*/
- private FileStatus[] listStatusUnderPath(FileSystem fs, Path p) throws IOException {
- boolean recursive = HiveConf.getBoolVar(job, HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE);
+ public static FileStatus[] listStatusUnderPath(FileSystem fs, Path p, JobConf job)
+ throws IOException {
+ HiveConf hiveConf = new HiveConf(job, FetchOperator.class);
+ boolean recursive = hiveConf.getBoolVar(HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE);
if (!recursive) {
return fs.listStatus(p);
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
index 770d904..6bbcd84 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
@@ -35,7 +35,10 @@
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
@@ -46,9 +49,8 @@
private static final long serialVersionUID = 1L;
private int maxRows = 100;
- private FetchOperator fetch;
- private ListSinkOperator sink;
private int totalRows;
+ private RowProcessor processor;
private static transient final Log LOG = LogFactory.getLog(FetchTask.class);
@@ -74,10 +76,7 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
// push down filters
HiveInputFormat.pushFilters(job, ts);
}
- sink = work.getSink();
- fetch = new FetchOperator(work, job, source, getVirtualColumns(source));
- source.initialize(conf, new ObjectInspector[]{fetch.getOutputObjectInspector()});
- totalRows = 0;
+ processor = createProcessor(work, conf, job);
} catch (Exception e) {
// Bail out ungracefully - we should never hit
@@ -87,6 +86,70 @@ public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext ctx) {
}
}
+ private RowProcessor createProcessor(FetchWork work, HiveConf conf, JobConf job)
+ throws HiveException {
+ final Operator> source = work.getSource();
+
+ RowFetcher fetcher;
+ if (!work.isMergeFetcher()) {
+ fetcher = new FetchOperator(work, job, source, getVirtualColumns(source));
+ } else {
+ fetcher = new MergeSortingFetcher(work, job) {
+ private boolean initialized;
+ public int compare(List