Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
-
None
Description
The following query will fail, if the datasets are created on a node group.
DDL:
drop dataverse tpch if exists; create dataverse tpch; use dataverse tpch; create type LineItemType as closed { l_orderkey: int64, l_partkey: int64, l_suppkey: int64, l_linenumber: int64, l_quantity: int64, l_extendedprice: double, l_discount: double, l_tax: double, l_returnflag: string, l_linestatus: string, l_shipdate: string, l_commitdate: string, l_receiptdate: string, l_shipinstruct: string, l_shipmode: string, l_comment: string } create type OrderType as closed { o_orderkey: int64, o_custkey: int64, o_orderstatus: string, o_totalprice: double, o_orderdate: string, o_orderpriority: string, o_clerk: string, o_shippriority: int64, o_comment: string } create nodegroup group1 if not exists on asterix_nc1; create dataset LineItem(LineItemType) primary key l_orderkey, l_linenumber on group1; create dataset Orders(OrderType) primary key o_orderkey on group1;
Query:
use dataverse tpch; declare function tmp() { for $l in dataset('LineItem') where $l.l_commitdate < $l.l_receiptdate distinct by $l.l_orderkey return { "o_orderkey": $l.l_orderkey } } for $o in dataset('Orders') for $t in tmp() where $o.o_orderkey = $t.o_orderkey and $o.o_orderdate >= '1993-07-01' and $o.o_orderdate < '1993-10-01' group by $o_orderpriority := $o.o_orderpriority with $o order by $o_orderpriority return { "order_priority": $o_orderpriority, "count": count($o) }
Exception:
Exception in thread "Thread-1" java.lang.AssertionError: Dependency activity partitioned differently from dependent: 4 != 2 at org.apache.hyracks.control.cc.scheduler.ActivityClusterPlanner.buildActivityPlanMap(ActivityClusterPlanner.java:109) at org.apache.hyracks.control.cc.scheduler.ActivityClusterPlanner.planActivityCluster(ActivityClusterPlanner.java:71) at org.apache.hyracks.control.cc.scheduler.JobScheduler.findRunnableTaskClusterRoots(JobScheduler.java:139) at org.apache.hyracks.control.cc.scheduler.JobScheduler.findRunnableTaskClusterRoots(JobScheduler.java:118) at org.apache.hyracks.control.cc.scheduler.JobScheduler.findRunnableTaskClusterRoots(JobScheduler.java:108) at org.apache.hyracks.control.cc.scheduler.JobScheduler.startRunnableActivityClusters(JobScheduler.java:164) at org.apache.hyracks.control.cc.scheduler.JobScheduler.notifyTaskComplete(JobScheduler.java:617) at org.apache.hyracks.control.cc.work.TaskCompleteWork.performEvent(TaskCompleteWork.java:56) at org.apache.hyracks.control.cc.work.AbstractTaskLifecycleWork.runWork(AbstractTaskLifecycleWork.java:70) at org.apache.hyracks.control.cc.work.AbstractHeartbeatWork.doRun(AbstractHeartbeatWork.java:48) at org.apache.hyracks.control.common.work.SynchronizableWork.run(SynchronizableWork.java:36) at org.apache.hyracks.control.common.work.WorkQueue$WorkerThread.run(WorkQueue.java:132)
distribute result [%0->$$25] -- DISTRIBUTE_RESULT |PARTITIONED| exchange -- ONE_TO_ONE_EXCHANGE |PARTITIONED| project ([$$25]) -- STREAM_PROJECT |PARTITIONED| assign [$$25] <- [function-call: asterix:closed-record-constructor, Args:[AString: {order_priority}, %0->$$3, AString: {count}, %0->$$33]] -- ASSIGN |PARTITIONED| exchange -- SORT_MERGE_EXCHANGE [$$3(ASC) ] |PARTITIONED| group by ([$$3 := %0->$$39]) decor ([]) { aggregate [$$33] <- [function-call: asterix:agg-sum, Args:[%0->$$38]] -- AGGREGATE |LOCAL| nested tuple source -- NESTED_TUPLE_SOURCE |LOCAL| } -- PRE_CLUSTERED_GROUP_BY[$$39] |PARTITIONED| exchange -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$39(ASC)] HASH:[$$39] |PARTITIONED| group by ([$$39 := %0->$$27]) decor ([]) { aggregate [$$38] <- [function-call: asterix:agg-count, Args:[AInt64: {1}]] -- AGGREGATE |LOCAL| nested tuple source -- NESTED_TUPLE_SOURCE |LOCAL| } -- SORT_GROUP_BY[$$27] |PARTITIONED| exchange -- ONE_TO_ONE_EXCHANGE |PARTITIONED| project ([$$27]) -- STREAM_PROJECT |PARTITIONED| exchange -- ONE_TO_ONE_EXCHANGE |PARTITIONED| join (function-call: algebricks:eq, Args:[%0->$$30, %0->$$31]) -- HYBRID_HASH_JOIN [$$30][$$31] |PARTITIONED| exchange -- ONE_TO_ONE_EXCHANGE |PARTITIONED| project ([$$27, $$30]) -- STREAM_PROJECT |PARTITIONED| select (function-call: algebricks:and, Args:[function-call: algebricks:lt, Args:[%0->$$29, AString: {1993-10-01}], function-call: algebricks:ge, Args:[%0->$$29, AString: {1993-07-01}]]) -- STREAM_SELECT |PARTITIONED| project ([$$27, $$29, $$30]) -- STREAM_PROJECT |PARTITIONED| assign [$$27, $$29] <- [function-call: asterix:field-access-by-index, Args:[%0->$$4, AInt32: {5}], function-call: asterix:field-access-by-index, Args:[%0->$$4, AInt32: {4}]] -- ASSIGN |PARTITIONED| exchange -- ONE_TO_ONE_EXCHANGE |PARTITIONED| data-scan []<-[$$30, $$4] <- tpch:Orders -- DATASOURCE_SCAN |PARTITIONED| exchange -- ONE_TO_ONE_EXCHANGE |PARTITIONED| empty-tuple-source -- EMPTY_TUPLE_SOURCE |PARTITIONED| exchange -- ONE_TO_ONE_EXCHANGE |PARTITIONED| distinct ([%0->$$31]) -- PRE_SORTED_DISTINCT_BY |PARTITIONED| exchange -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$31(ASC)] HASH:[$$31] |PARTITIONED| project ([$$31]) -- STREAM_PROJECT |PARTITIONED| select (function-call: algebricks:lt, Args:[function-call: asterix:field-access-by-index, Args:[%0->$$5, AInt32: {11}], function-call: asterix:field-access-by-index, Args:[%0->$$5, AInt32: {12}]]) -- STREAM_SELECT |PARTITIONED| project ([$$5, $$31]) -- STREAM_PROJECT |PARTITIONED| exchange -- ONE_TO_ONE_EXCHANGE |PARTITIONED| data-scan []<-[$$31, $$32, $$5] <- tpch:LineItem -- DATASOURCE_SCAN |PARTITIONED| exchange -- ONE_TO_ONE_EXCHANGE |PARTITIONED| empty-tuple-source -- EMPTY_TUPLE_SOURCE |PARTITIONED|
The reason is the optimized query plan assumes that computation nodes are the same as storage nodes and uses wrong exchange strategies.