diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 10b364a..7b08050 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3034,6 +3034,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal -1f, "The customized fraction of JVM memory which Tez will reserve for the processor"), TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED("hive.tez.cartesian-product.enabled", false, "Use Tez cartesian product edge to speed up cross product"), + TEZ_ORDERBY_PARALLELISM("hive.tez.orderby.parallelism", 1, "Increase this to use parallel " + + "sorting feature from Tez. It samples data and do range partitioning on the fly so that we " + + "have multiple reducers. Each reducer output one sorted partition"), // The default is different on the client and server, so it's null here. LLAP_IO_ENABLED("hive.llap.io.enabled", null, "Whether the LLAP IO layer is enabled."), LLAP_IO_TRACE_SIZE("hive.llap.io.trace.size", "2Mb", diff --git a/data/conf/tez/tez-site.xml b/data/conf/tez/tez-site.xml index c575544..d47741e 100644 --- a/data/conf/tez/tez-site.xml +++ b/data/conf/tez/tez-site.xml @@ -7,4 +7,8 @@ tez.am.dag.scheduler.class org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled + + tez.am.task.reschedule.higher.priority + false + diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 9b0bace..27d8d43 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -62,6 +62,7 @@ minitez.query.files=acid_vectorization_original_tez.q,\ hybridgrace_hashjoin_1.q,\ hybridgrace_hashjoin_2.q,\ multi_count_distinct.q,\ + orderby.q,\ tez-tag.q,\ tez_union_with_udf.q diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 5c338b8..6f07fdf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -49,6 +49,9 @@ import org.apache.tez.runtime.library.api.Partitioner; import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig; import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager; +import org.apache.tez.runtime.library.input.OrderedGroupedKVInput; +import org.apache.tez.runtime.library.output.OrderedPartitionedSamplingKVOutput; +import org.apache.tez.runtime.library.partitioner.SamplingBasedRangePartitioner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -354,6 +357,11 @@ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgePr case SIMPLE_EDGE: { setupAutoReducerParallelism(edgeProp, w); + if (edgeProp.isOrderBy() && w.getParallelism() > 1) { + v.enableSampler(); + v.setConf(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, + vConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS)); + } break; } case CUSTOM_SIMPLE_EDGE: { @@ -445,15 +453,31 @@ private EdgeProperty createEdgeProperty(Vertex w, TezEdgeProperty edgeProp, // fallthrough default: assert partitionerClassName != null; - partitionerConf = createPartitionerConf(partitionerClassName, conf); - OrderedPartitionedKVEdgeConfig et5Conf = OrderedPartitionedKVEdgeConfig + if (edgeProp.isOrderBy() && w.getParallelism() > 1) { + Configuration inputConf = new TezConfiguration(conf), outputConf = new TezConfiguration(conf); + inputConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClass); + inputConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valClass); + outputConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, SamplingBasedRangePartitioner.class.getName()); + outputConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClass); + outputConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valClass); + + return EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, + EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, + OutputDescriptor.create(OrderedPartitionedSamplingKVOutput.class.getName()) + .setUserPayload(TezUtils.createUserPayloadFromConf(outputConf)), + InputDescriptor.create(OrderedGroupedKVInput.class.getName()) + .setUserPayload(TezUtils.createUserPayloadFromConf(inputConf))); + } else { + partitionerConf = createPartitionerConf(partitionerClassName, conf); + OrderedPartitionedKVEdgeConfig et5Conf = OrderedPartitionedKVEdgeConfig .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) .setFromConfiguration(conf) .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), - TezBytesComparator.class.getName(), null) + TezBytesComparator.class.getName(), null) .setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null) .build(); - return et5Conf.createDefaultEdgeProperty(); + return et5Conf.createDefaultEdgeProperty(); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java index 01cb2b3..25b0c62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java @@ -142,6 +142,7 @@ public static ReduceWork createReduceWork( edgeProp = new TezEdgeProperty(edgeType); edgeProp.setSlowStart(reduceWork.isSlowStart()); } + edgeProp.setOrderBy(reduceSink.getConf().hasOrderBy()); reduceWork.setEdgePropRef(edgeProp); tezWork.connect( diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 7a7460e..c72e541 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -7918,7 +7918,9 @@ private Operator genReduceSinkPlan(String dest, QB qb, Operator input, if (sortExprs == null) { sortExprs = qb.getParseInfo().getOrderByForClause(dest); if (sortExprs != null) { - assert numReducers == 1; + if (!HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + assert numReducers == 1; + } // in strict mode, in the presence of order by, limit must be specified if (qb.getParseInfo().getDestLimit(dest) == null) { String error = StrictChecks.checkNoLimit(conf); @@ -10056,6 +10058,9 @@ private Operator genPostGroupByBodyPlan(Operator curr, String dest, QB qb, // Use only 1 reducer if order by is present if (hasOrderBy) { numReducers = 1; + if (HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + numReducers = HiveConf.getIntVar(conf, ConfVars.TEZ_ORDERBY_PARALLELISM); + } } curr = genReduceSinkPlan(dest, qb, curr, numReducers, hasOrderBy); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index d43b81a..cf51064 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -42,6 +42,16 @@ private int maxReducer; private long inputSizePerReducer; + public boolean isOrderBy() { + return isOrderBy; + } + + public void setOrderBy(boolean orderBy) { + isOrderBy = orderBy; + } + + private boolean isOrderBy; + public TezEdgeProperty(HiveConf hiveConf, EdgeType edgeType, int buckets) { this.hiveConf = hiveConf; diff --git a/ql/src/test/queries/clientpositive/orderby.q b/ql/src/test/queries/clientpositive/orderby.q new file mode 100644 index 0000000..28dcecb --- /dev/null +++ b/ql/src/test/queries/clientpositive/orderby.q @@ -0,0 +1,4 @@ +set hive.tez.orderby.parallelism=10; + +select * from src order by key; + diff --git a/ql/src/test/results/clientpositive/tez/orderby.q.out b/ql/src/test/results/clientpositive/tez/orderby.q.out new file mode 100644 index 0000000..0080718 --- /dev/null +++ b/ql/src/test/results/clientpositive/tez/orderby.q.out @@ -0,0 +1,508 @@ +PREHOOK: query: select * from src order by key +PREHOOK: type: QUERY +PREHOOK: Input: default@src +#### A masked pattern was here #### +POSTHOOK: query: select * from src order by key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +#### A masked pattern was here #### +0 val_0 +0 val_0 +0 val_0 +10 val_10 +100 val_100 +100 val_100 +103 val_103 +103 val_103 +104 val_104 +104 val_104 +105 val_105 +11 val_11 +111 val_111 +113 val_113 +113 val_113 +114 val_114 +116 val_116 +118 val_118 +118 val_118 +119 val_119 +119 val_119 +119 val_119 +12 val_12 +12 val_12 +120 val_120 +120 val_120 +125 val_125 +125 val_125 +126 val_126 +128 val_128 +128 val_128 +128 val_128 +129 val_129 +129 val_129 +131 val_131 +133 val_133 +134 val_134 +134 val_134 +136 val_136 +137 val_137 +137 val_137 +138 val_138 +138 val_138 +138 val_138 +138 val_138 +143 val_143 +145 val_145 +146 val_146 +146 val_146 +149 val_149 +149 val_149 +15 val_15 +15 val_15 +150 val_150 +152 val_152 +152 val_152 +153 val_153 +155 val_155 +156 val_156 +157 val_157 +158 val_158 +160 val_160 +162 val_162 +163 val_163 +164 val_164 +164 val_164 +165 val_165 +165 val_165 +166 val_166 +167 val_167 +167 val_167 +167 val_167 +168 val_168 +169 val_169 +169 val_169 +169 val_169 +169 val_169 +17 val_17 +170 val_170 +172 val_172 +172 val_172 +174 val_174 +174 val_174 +175 val_175 +175 val_175 +176 val_176 +176 val_176 +177 val_177 +178 val_178 +179 val_179 +179 val_179 +18 val_18 +18 val_18 +180 val_180 +181 val_181 +183 val_183 +186 val_186 +187 val_187 +187 val_187 +187 val_187 +189 val_189 +19 val_19 +190 val_190 +191 val_191 +191 val_191 +192 val_192 +193 val_193 +193 val_193 +193 val_193 +194 val_194 +195 val_195 +195 val_195 +196 val_196 +197 val_197 +197 val_197 +199 val_199 +199 val_199 +199 val_199 +2 val_2 +20 val_20 +200 val_200 +200 val_200 +201 val_201 +202 val_202 +203 val_203 +203 val_203 +205 val_205 +205 val_205 +207 val_207 +207 val_207 +208 val_208 +208 val_208 +208 val_208 +209 val_209 +209 val_209 +213 val_213 +213 val_213 +214 val_214 +216 val_216 +216 val_216 +217 val_217 +217 val_217 +218 val_218 +219 val_219 +219 val_219 +221 val_221 +221 val_221 +222 val_222 +223 val_223 +223 val_223 +224 val_224 +224 val_224 +226 val_226 +228 val_228 +229 val_229 +229 val_229 +230 val_230 +230 val_230 +230 val_230 +230 val_230 +230 val_230 +233 val_233 +233 val_233 +235 val_235 +237 val_237 +237 val_237 +238 val_238 +238 val_238 +239 val_239 +239 val_239 +24 val_24 +24 val_24 +241 val_241 +242 val_242 +242 val_242 +244 val_244 +247 val_247 +248 val_248 +249 val_249 +252 val_252 +255 val_255 +255 val_255 +256 val_256 +256 val_256 +257 val_257 +258 val_258 +26 val_26 +26 val_26 +260 val_260 +262 val_262 +263 val_263 +265 val_265 +265 val_265 +266 val_266 +27 val_27 +272 val_272 +272 val_272 +273 val_273 +273 val_273 +273 val_273 +274 val_274 +275 val_275 +277 val_277 +277 val_277 +277 val_277 +277 val_277 +278 val_278 +278 val_278 +28 val_28 +280 val_280 +280 val_280 +281 val_281 +281 val_281 +282 val_282 +282 val_282 +283 val_283 +284 val_284 +285 val_285 +286 val_286 +287 val_287 +288 val_288 +288 val_288 +289 val_289 +291 val_291 +292 val_292 +296 val_296 +298 val_298 +298 val_298 +298 val_298 +30 val_30 +302 val_302 +305 val_305 +306 val_306 +307 val_307 +307 val_307 +308 val_308 +309 val_309 +309 val_309 +310 val_310 +311 val_311 +311 val_311 +311 val_311 +315 val_315 +316 val_316 +316 val_316 +316 val_316 +317 val_317 +317 val_317 +318 val_318 +318 val_318 +318 val_318 +321 val_321 +321 val_321 +322 val_322 +322 val_322 +323 val_323 +325 val_325 +325 val_325 +327 val_327 +327 val_327 +327 val_327 +33 val_33 +331 val_331 +331 val_331 +332 val_332 +333 val_333 +333 val_333 +335 val_335 +336 val_336 +338 val_338 +339 val_339 +34 val_34 +341 val_341 +342 val_342 +342 val_342 +344 val_344 +344 val_344 +345 val_345 +348 val_348 +348 val_348 +348 val_348 +348 val_348 +348 val_348 +35 val_35 +35 val_35 +35 val_35 +351 val_351 +353 val_353 +353 val_353 +356 val_356 +360 val_360 +362 val_362 +364 val_364 +365 val_365 +366 val_366 +367 val_367 +367 val_367 +368 val_368 +369 val_369 +369 val_369 +369 val_369 +37 val_37 +37 val_37 +373 val_373 +374 val_374 +375 val_375 +377 val_377 +378 val_378 +379 val_379 +382 val_382 +382 val_382 +384 val_384 +384 val_384 +384 val_384 +386 val_386 +389 val_389 +392 val_392 +393 val_393 +394 val_394 +395 val_395 +395 val_395 +396 val_396 +396 val_396 +396 val_396 +397 val_397 +397 val_397 +399 val_399 +399 val_399 +4 val_4 +400 val_400 +401 val_401 +401 val_401 +401 val_401 +401 val_401 +401 val_401 +402 val_402 +403 val_403 +403 val_403 +403 val_403 +404 val_404 +404 val_404 +406 val_406 +406 val_406 +406 val_406 +406 val_406 +407 val_407 +409 val_409 +409 val_409 +409 val_409 +41 val_41 +411 val_411 +413 val_413 +413 val_413 +414 val_414 +414 val_414 +417 val_417 +417 val_417 +417 val_417 +418 val_418 +419 val_419 +42 val_42 +42 val_42 +421 val_421 +424 val_424 +424 val_424 +427 val_427 +429 val_429 +429 val_429 +43 val_43 +430 val_430 +430 val_430 +430 val_430 +431 val_431 +431 val_431 +431 val_431 +432 val_432 +435 val_435 +436 val_436 +437 val_437 +438 val_438 +438 val_438 +438 val_438 +439 val_439 +439 val_439 +44 val_44 +443 val_443 +444 val_444 +446 val_446 +448 val_448 +449 val_449 +452 val_452 +453 val_453 +454 val_454 +454 val_454 +454 val_454 +455 val_455 +457 val_457 +458 val_458 +458 val_458 +459 val_459 +459 val_459 +460 val_460 +462 val_462 +462 val_462 +463 val_463 +463 val_463 +466 val_466 +466 val_466 +466 val_466 +467 val_467 +468 val_468 +468 val_468 +468 val_468 +468 val_468 +469 val_469 +469 val_469 +469 val_469 +469 val_469 +469 val_469 +47 val_47 +470 val_470 +472 val_472 +475 val_475 +477 val_477 +478 val_478 +478 val_478 +479 val_479 +480 val_480 +480 val_480 +480 val_480 +481 val_481 +482 val_482 +483 val_483 +484 val_484 +485 val_485 +487 val_487 +489 val_489 +489 val_489 +489 val_489 +489 val_489 +490 val_490 +491 val_491 +492 val_492 +492 val_492 +493 val_493 +494 val_494 +495 val_495 +496 val_496 +497 val_497 +498 val_498 +498 val_498 +498 val_498 +5 val_5 +5 val_5 +5 val_5 +51 val_51 +51 val_51 +53 val_53 +54 val_54 +57 val_57 +58 val_58 +58 val_58 +64 val_64 +65 val_65 +66 val_66 +67 val_67 +67 val_67 +69 val_69 +70 val_70 +70 val_70 +70 val_70 +72 val_72 +72 val_72 +74 val_74 +76 val_76 +76 val_76 +77 val_77 +78 val_78 +8 val_8 +80 val_80 +82 val_82 +83 val_83 +83 val_83 +84 val_84 +84 val_84 +85 val_85 +86 val_86 +87 val_87 +9 val_9 +90 val_90 +90 val_90 +90 val_90 +92 val_92 +95 val_95 +95 val_95 +96 val_96 +97 val_97 +97 val_97 +98 val_98 +98 val_98