Uploaded image for project: 'Apache AsterixDB'
  1. Apache AsterixDB
  2. ASTERIXDB-1343

Queries over nodegroup-based datasets do not work

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • None
    • None
    • None

    Description

      The following query will fail, if the datasets are created on a node group.

      DDL:

      drop dataverse tpch if exists;
      create dataverse tpch;
      
      use dataverse tpch;
      
      create type LineItemType as closed {
        l_orderkey: int64,
        l_partkey: int64,
        l_suppkey: int64,
        l_linenumber: int64,
        l_quantity: int64,
        l_extendedprice: double,
        l_discount: double,
        l_tax: double,
        l_returnflag: string,
        l_linestatus: string,
        l_shipdate: string,
        l_commitdate: string,
        l_receiptdate: string,
        l_shipinstruct: string,
        l_shipmode: string,
        l_comment: string
      }
      
      create type OrderType as closed {
        o_orderkey: int64,
        o_custkey: int64,
        o_orderstatus: string,
        o_totalprice: double,
        o_orderdate: string,
        o_orderpriority: string,
        o_clerk: string,
        o_shippriority: int64,
        o_comment: string
      }
      
      create  nodegroup group1 if not exists  on
          asterix_nc1;
      
      create dataset LineItem(LineItemType)
        primary key l_orderkey, l_linenumber on group1;
      create dataset Orders(OrderType)
        primary key o_orderkey on group1;
      

      Query:

      use dataverse tpch;
      
      declare function tmp()
      {
        for $l in dataset('LineItem')
        where $l.l_commitdate < $l.l_receiptdate
        distinct by $l.l_orderkey
        return { "o_orderkey": $l.l_orderkey }
      }
      
      for $o in dataset('Orders')
      for $t in tmp()
      where $o.o_orderkey = $t.o_orderkey and 
        $o.o_orderdate >= '1993-07-01' and $o.o_orderdate < '1993-10-01' 
      group by $o_orderpriority := $o.o_orderpriority with $o
      order by $o_orderpriority
      return {
        "order_priority": $o_orderpriority,
        "count": count($o)
      }
      

      Exception:

      Exception in thread "Thread-1" java.lang.AssertionError: Dependency activity partitioned differently from dependent: 4 != 2
      	at org.apache.hyracks.control.cc.scheduler.ActivityClusterPlanner.buildActivityPlanMap(ActivityClusterPlanner.java:109)
      	at org.apache.hyracks.control.cc.scheduler.ActivityClusterPlanner.planActivityCluster(ActivityClusterPlanner.java:71)
      	at org.apache.hyracks.control.cc.scheduler.JobScheduler.findRunnableTaskClusterRoots(JobScheduler.java:139)
      	at org.apache.hyracks.control.cc.scheduler.JobScheduler.findRunnableTaskClusterRoots(JobScheduler.java:118)
      	at org.apache.hyracks.control.cc.scheduler.JobScheduler.findRunnableTaskClusterRoots(JobScheduler.java:108)
      	at org.apache.hyracks.control.cc.scheduler.JobScheduler.startRunnableActivityClusters(JobScheduler.java:164)
      	at org.apache.hyracks.control.cc.scheduler.JobScheduler.notifyTaskComplete(JobScheduler.java:617)
      	at org.apache.hyracks.control.cc.work.TaskCompleteWork.performEvent(TaskCompleteWork.java:56)
      	at org.apache.hyracks.control.cc.work.AbstractTaskLifecycleWork.runWork(AbstractTaskLifecycleWork.java:70)
      	at org.apache.hyracks.control.cc.work.AbstractHeartbeatWork.doRun(AbstractHeartbeatWork.java:48)
      	at org.apache.hyracks.control.common.work.SynchronizableWork.run(SynchronizableWork.java:36)
      	at org.apache.hyracks.control.common.work.WorkQueue$WorkerThread.run(WorkQueue.java:132)
      
      distribute result [%0->$$25]
      -- DISTRIBUTE_RESULT  |PARTITIONED|
        exchange 
        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
          project ([$$25])
          -- STREAM_PROJECT  |PARTITIONED|
            assign [$$25] <- [function-call: asterix:closed-record-constructor, Args:[AString: {order_priority}, %0->$$3, AString: {count}, %0->$$33]]
            -- ASSIGN  |PARTITIONED|
              exchange 
              -- SORT_MERGE_EXCHANGE [$$3(ASC) ]  |PARTITIONED|
                group by ([$$3 := %0->$$39]) decor ([]) {
                          aggregate [$$33] <- [function-call: asterix:agg-sum, Args:[%0->$$38]]
                          -- AGGREGATE  |LOCAL|
                            nested tuple source
                            -- NESTED_TUPLE_SOURCE  |LOCAL|
                       }
                -- PRE_CLUSTERED_GROUP_BY[$$39]  |PARTITIONED|
                  exchange 
                  -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$39(ASC)] HASH:[$$39]  |PARTITIONED|
                    group by ([$$39 := %0->$$27]) decor ([]) {
                              aggregate [$$38] <- [function-call: asterix:agg-count, Args:[AInt64: {1}]]
                              -- AGGREGATE  |LOCAL|
                                nested tuple source
                                -- NESTED_TUPLE_SOURCE  |LOCAL|
                           }
                    -- SORT_GROUP_BY[$$27]  |PARTITIONED|
                      exchange 
                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                        project ([$$27])
                        -- STREAM_PROJECT  |PARTITIONED|
                          exchange 
                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                            join (function-call: algebricks:eq, Args:[%0->$$30, %0->$$31])
                            -- HYBRID_HASH_JOIN [$$30][$$31]  |PARTITIONED|
                              exchange 
                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                project ([$$27, $$30])
                                -- STREAM_PROJECT  |PARTITIONED|
                                  select (function-call: algebricks:and, Args:[function-call: algebricks:lt, Args:[%0->$$29, AString: {1993-10-01}], function-call: algebricks:ge, Args:[%0->$$29, AString: {1993-07-01}]])
                                  -- STREAM_SELECT  |PARTITIONED|
                                    project ([$$27, $$29, $$30])
                                    -- STREAM_PROJECT  |PARTITIONED|
                                      assign [$$27, $$29] <- [function-call: asterix:field-access-by-index, Args:[%0->$$4, AInt32: {5}], function-call: asterix:field-access-by-index, Args:[%0->$$4, AInt32: {4}]]
                                      -- ASSIGN  |PARTITIONED|
                                        exchange 
                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                          data-scan []<-[$$30, $$4] <- tpch:Orders
                                          -- DATASOURCE_SCAN  |PARTITIONED|
                                            exchange 
                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                              empty-tuple-source
                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                              exchange 
                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                distinct ([%0->$$31])
                                -- PRE_SORTED_DISTINCT_BY  |PARTITIONED|
                                  exchange 
                                  -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$31(ASC)] HASH:[$$31]  |PARTITIONED|
                                    project ([$$31])
                                    -- STREAM_PROJECT  |PARTITIONED|
                                      select (function-call: algebricks:lt, Args:[function-call: asterix:field-access-by-index, Args:[%0->$$5, AInt32: {11}], function-call: asterix:field-access-by-index, Args:[%0->$$5, AInt32: {12}]])
                                      -- STREAM_SELECT  |PARTITIONED|
                                        project ([$$5, $$31])
                                        -- STREAM_PROJECT  |PARTITIONED|
                                          exchange 
                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                            data-scan []<-[$$31, $$32, $$5] <- tpch:LineItem
                                            -- DATASOURCE_SCAN  |PARTITIONED|
                                              exchange 
                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                empty-tuple-source
                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
      

      The reason is the optimized query plan assumes that computation nodes are the same as storage nodes and uses wrong exchange strategies.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            buyingyi Yingyi Bu
            buyingyi Yingyi Bu
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment