diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index f430a131af..9ea08d271e 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -168,6 +168,7 @@ minillap.query.files=\ union_script.q,\ vector_custom_udf_configure.q,\ vector_offset_limit.q,\ + vector_ptf_last_value_streaming.q,\ vector_udf3.q,\ whroot_external1.q diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java index e385d4e755..38c130dc82 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFInvocationSpec.java @@ -588,6 +588,19 @@ public String toString() { return String.format("%s %s %s", super.toString(), order, nullOrder); } + + public void reverseOrder() { + if (this.order == Order.ASC) { + this.order = Order.DESC; + } else { + this.order = Order.ASC; + } + if (this.nullOrder == NullOrder.NULLS_FIRST) { + this.nullOrder = NullOrder.NULLS_LAST; + } else { + this.nullOrder = NullOrder.NULLS_FIRST; + } + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java index b09f4c2a0b..c8958a7af7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java @@ -74,6 +74,7 @@ import org.apache.hadoop.hive.ql.plan.ptf.WindowTableFunctionDef; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFFirstValue; import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionEvaluator; import org.apache.hadoop.hive.ql.udf.ptf.TableFunctionResolver; import org.apache.hadoop.hive.ql.udf.ptf.WindowingTableFunction.WindowingTableFunctionResolver; @@ -142,6 +143,18 @@ public PTFDesc translate(WindowingSpec wdwSpec, SemanticAnalyzer semAly, HiveCon UnparseTranslator unparseT) throws SemanticException { init(semAly, hCfg, inputRR, unparseT); + for (int i = 0; i < wdwSpec.getWindowExpressions().size(); ++i) { + WindowExpressionSpec wes = wdwSpec.getWindowExpressions().get(i); + if (wes instanceof WindowFunctionSpec && ((WindowFunctionSpec) wes).getName().equals("last_value")) { + ((WindowFunctionSpec) wes).setName("first_value"); + OrderSpec orderSpec = ((WindowFunctionSpec) wes).windowSpec.getOrder(); + if (orderSpec != null) { + for(int j = 0; j < orderSpec.expressions.size(); ++j) { + orderSpec.expressions.get(j).reverseOrder(); + } + } + } + } windowingSpec = wdwSpec; ptfDesc = new PTFDesc(); ptfDesc.setCfg(hCfg); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingComponentizer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingComponentizer.java index 1929deb89c..dd4abc4ab6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingComponentizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingComponentizer.java @@ -26,6 +26,8 @@ import org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.PartitioningSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowExpressionSpec; import org.apache.hadoop.hive.ql.parse.WindowingSpec.WindowFunctionSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /* * breakup the original WindowingSpec into a set of WindowingSpecs. @@ -43,6 +45,7 @@ */ public class WindowingComponentizer { + private static final Logger LOG = LoggerFactory.getLogger(WindowingComponentizer.class.getName()); WindowingSpec originalSpec; LinkedHashMap groups; @@ -83,8 +86,8 @@ public WindowingSpec next(HiveConf hCfg, WindowingSpec wSpec = entry.getValue(); try { PTFTranslator t = new PTFTranslator(); - t.translate(wSpec, semAly, hCfg, inputRR, unparseT); groups.remove(entry.getKey()); + t.translate(wSpec, semAly, hCfg, inputRR, unparseT); return wSpec; } catch (SemanticException se) { originalException = se; diff --git a/ql/src/test/queries/clientpositive/vector_ptf_last_value_streaming.q b/ql/src/test/queries/clientpositive/vector_ptf_last_value_streaming.q new file mode 100644 index 0000000000..5d156cd0cf --- /dev/null +++ b/ql/src/test/queries/clientpositive/vector_ptf_last_value_streaming.q @@ -0,0 +1,7 @@ +set hive.query.results.cache.enabled=false; +create temporary table test2(id STRING,name STRING,event_dt date) stored as orc; + +insert into test2 values ('100','A','2019-08-15'), ('100','A','2019-10-12'); + +explain vectorization detail SELECT name, event_dt, last_value(event_dt) over (PARTITION BY name ORDER BY event_dt desc ROWS BETWEEN unbounded preceding and unbounded following) last_event_dt FROM test2; +SELECT name, event_dt, last_value(event_dt) over (PARTITION BY name ORDER BY event_dt desc ROWS BETWEEN unbounded preceding and unbounded following) last_event_dt FROM test2; diff --git a/ql/src/test/results/clientpositive/llap/vector_ptf_last_value_streaming.q.out b/ql/src/test/results/clientpositive/llap/vector_ptf_last_value_streaming.q.out new file mode 100644 index 0000000000..a4d4c88d44 --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/vector_ptf_last_value_streaming.q.out @@ -0,0 +1,172 @@ +PREHOOK: query: create temporary table test2(id STRING,name STRING,event_dt date) stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@test2 +POSTHOOK: query: create temporary table test2(id STRING,name STRING,event_dt date) stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test2 +PREHOOK: query: insert into test2 values ('100','A','2019-08-15'), ('100','A','2019-10-12') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@test2 +POSTHOOK: query: insert into test2 values ('100','A','2019-08-15'), ('100','A','2019-10-12') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@test2 +POSTHOOK: Lineage: test2.event_dt SCRIPT [] +POSTHOOK: Lineage: test2.id SCRIPT [] +POSTHOOK: Lineage: test2.name SCRIPT [] +PREHOOK: query: explain vectorization detail SELECT name, event_dt, last_value(event_dt) over (PARTITION BY name ORDER BY event_dt desc ROWS BETWEEN unbounded preceding and unbounded following) last_event_dt FROM test2 +PREHOOK: type: QUERY +PREHOOK: Input: default@test2 +#### A masked pattern was here #### +POSTHOOK: query: explain vectorization detail SELECT name, event_dt, last_value(event_dt) over (PARTITION BY name ORDER BY event_dt desc ROWS BETWEEN unbounded preceding and unbounded following) last_event_dt FROM test2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test2 +#### A masked pattern was here #### +PLAN VECTORIZATION: + enabled: true + enabledConditionsMet: [hive.vectorized.execution.enabled IS true] + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: test2 + Statistics: Num rows: 2 Data size: 480 Basic stats: COMPLETE Column stats: NONE + TableScan Vectorization: + native: true + vectorizationSchemaColumns: [0:id:string, 1:name:string, 2:event_dt:date, 3:ROW__ID:struct] + Reduce Output Operator + key expressions: name (type: string), event_dt (type: date) + null sort order: aa + sort order: ++ + Map-reduce partition columns: name (type: string) + Reduce Sink Vectorization: + className: VectorReduceSinkObjectHashOperator + keyColumns: 1:string, 2:date + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + partitionColumns: 1:string + Statistics: Num rows: 2 Data size: 480 Basic stats: COMPLETE Column stats: NONE + Execution mode: vectorized, llap + LLAP IO: all inputs + Map Vectorization: + enabled: true + enabledConditionsMet: hive.vectorized.use.vectorized.input.format IS true + inputFormatFeatureSupport: [DECIMAL_64] + featureSupportInUse: [DECIMAL_64] + inputFileFormats: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat + allNative: true + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 3 + includeColumns: [1, 2] + dataColumns: id:string, name:string, event_dt:date + partitionColumnCount: 0 + scratchColumnTypeNames: [] + Reducer 2 + Execution mode: vectorized, llap + Reduce Vectorization: + enabled: true + enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true + reduceColumnNullOrder: aa + reduceColumnSortOrder: ++ + allNative: false + usesVectorUDFAdaptor: false + vectorized: true + rowBatchContext: + dataColumnCount: 2 + dataColumns: KEY.reducesinkkey0:string, KEY.reducesinkkey1:date + partitionColumnCount: 0 + scratchColumnTypeNames: [bigint] + Reduce Operator Tree: + Select Operator + expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: date) + outputColumnNames: _col1, _col2 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [0, 1] + Statistics: Num rows: 2 Data size: 480 Basic stats: COMPLETE Column stats: NONE + PTF Operator + Function definitions: + Input definition + input alias: ptf_0 + output shape: _col1: string, _col2: date + type: WINDOWING + Windowing table definition + input alias: ptf_1 + name: windowingtablefunction + order by: _col2 ASC NULLS FIRST + partition by: _col1 + raw input shape: + window functions: + window function definition + alias: last_value_window_0 + arguments: _col2 + name: first_value + window function: GenericUDAFFirstValueEvaluator + window frame: ROWS PRECEDING(MAX)~FOLLOWING(MAX) + PTF Vectorization: + className: VectorPTFOperator + evaluatorClasses: [VectorPTFEvaluatorLongFirstValue] + functionInputExpressions: [col 1:date] + functionNames: [first_value] + keyInputColumns: [0, 1] + native: true + nonKeyInputColumns: [] + orderExpressions: [col 1:date] + outputColumns: [2, 0, 1] + outputTypes: [date, string, date] + partitionExpressions: [col 0:string] + streamingColumns: [2] + Statistics: Num rows: 2 Data size: 480 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: _col1 (type: string), _col2 (type: date), last_value_window_0 (type: date) + outputColumnNames: _col0, _col1, _col2 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [0, 1, 2] + Statistics: Num rows: 2 Data size: 480 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + File Sink Vectorization: + className: VectorFileSinkOperator + native: false + Statistics: Num rows: 2 Data size: 480 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT name, event_dt, last_value(event_dt) over (PARTITION BY name ORDER BY event_dt desc ROWS BETWEEN unbounded preceding and unbounded following) last_event_dt FROM test2 +PREHOOK: type: QUERY +PREHOOK: Input: default@test2 +#### A masked pattern was here #### +POSTHOOK: query: SELECT name, event_dt, last_value(event_dt) over (PARTITION BY name ORDER BY event_dt desc ROWS BETWEEN unbounded preceding and unbounded following) last_event_dt FROM test2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test2 +#### A masked pattern was here #### +A 2019-08-15 2019-08-15 +A 2019-10-12 2019-08-15