On some TPC-DS queries with mt_dop > 0, LOCAL runtime filters go missing. I.e. the scan waits for RUNTIME_FILTER_WAIT_TIME_MS and they never show up. I can reproduce in my minicluster on tpcds_parquet tpcds-q77.sql profile_50467cb8e73eeac4_853461b400000000
Interestingly, on this one run, one impalad received the filters fine and the others didn't get them. I set -vmodule=runtime-filter-bank=3 and it looks like it might be related to whether the consumer filter is registered before the producer. Here are logs from the good and bad daemons.
tarmstrong@tarmstrong-Precision-7540:~/impala/impala$ grep 50467cb8e73eeac4 logs/cluster/impalad.INFO | grep filter-bank | grep 'filter 22'
I0410 15:32:10.422222 29384 runtime-filter-bank.cc:124] 50467cb8e73eeac4:853461b400000022] registered consumer filter 22
I0410 15:32:10.433528 29387 runtime-filter-bank.cc:129] 50467cb8e73eeac4:853461b400000023] re-registered consumer filter 22
I0410 15:32:10.460548 29389 runtime-filter-bank.cc:129] 50467cb8e73eeac4:853461b400000024] re-registered consumer filter 22
I0410 15:32:10.482293 29392 runtime-filter-bank.cc:129] 50467cb8e73eeac4:853461b400000025] re-registered consumer filter 22
I0410 15:32:12.627218 29558 runtime-filter-bank.cc:186] 50467cb8e73eeac4:853461b4000000f3] Setting broadcast filter 22
tarmstrong@tarmstrong-Precision-7540:~/impala/impala$ grep 50467cb8e73eeac4 logs/cluster/impalad_node1.INFO | grep filter-bank | grep 'filter 22'
I0410 15:32:12.018474 29402 runtime-filter-bank.cc:186] 50467cb8e73eeac4:853461b4000000f2] Setting broadcast filter 22
I0410 15:32:12.182348 29580 runtime-filter-bank.cc:124] 50467cb8e73eeac4:853461b40000001e] registered consumer filter 22
I0410 15:32:12.212008 29581 runtime-filter-bank.cc:129] 50467cb8e73eeac4:853461b40000001f] re-registered consumer filter 22
I0410 15:32:12.236542 29582 runtime-filter-bank.cc:129] 50467cb8e73eeac4:853461b400000020] re-registered consumer filter 22
I0410 15:32:12.250748 29583 runtime-filter-bank.cc:129] 50467cb8e73eeac4:853461b400000021] re-registered consumer filter 22
It looks like with mt_dop=0, this works because they are both registered in Prepare() of the same fragment. But with mt_dop>1, the fragments start up independently and the filter might be published before the consumer registers. This doesn't appear to be handled.
Thanks to drorke for finding this.