From 39d54695517246dea667b7e8962b8d74694695a4 Mon Sep 17 00:00:00 2001 From: Tim Armstrong Date: Wed, 17 Aug 2016 00:35:14 -0700 Subject: [PATCH] IMPALA-3987: short-circuit joins with empty builds Sometimes join queries end up with an empty build side. In this case, for many join modes it is possible to exploit the empty build side to dramatically speed up the query by short-circuiting the join and returning 0 rows. TODO: IMPALA-3990 prevents this from working properly. Testing: Added functional tests for all join modes of hash join and nested loop join where the build side is empty. TODO: testing to exercise the cases where IMPALA-3990 would cause problems. --- be/src/exec/blocking-join-node.cc | 2 + be/src/exec/blocking-join-node.h | 7 + be/src/exec/nested-loop-join-node.cc | 15 +- be/src/exec/partitioned-hash-join-node.cc | 13 +- .../queries/QueryTest/empty-build-joins.test | 192 +++++++++++++++++++++ .../queries/QueryTest/single-node-nlj.test | 53 ++++++ .../queries/primitive_empty_build_join_1.test | 13 ++ tests/query_test/test_join_queries.py | 5 + 8 files changed, 296 insertions(+), 4 deletions(-) create mode 100644 testdata/workloads/functional-query/queries/QueryTest/empty-build-joins.test create mode 100644 testdata/workloads/targeted-perf/queries/primitive_empty_build_join_1.test diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc index ea541c0..1a3bde2 100644 --- a/be/src/exec/blocking-join-node.cc +++ b/be/src/exec/blocking-join-node.cc @@ -236,6 +236,8 @@ Status BlockingJoinNode::ConstructBuildAndOpenProbe(RuntimeState* state, } else { RETURN_IF_ERROR(SendBuildInputToSink(state, build_sink)); } + // TODO: we could avoid opening the child here for certain join modes if we know the + // build side returned no rows. RETURN_IF_ERROR(child(0)->Open(state)); } return Status::OK(); diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h index b3d25c1..b2e9ec1 100644 --- a/be/src/exec/blocking-join-node.h +++ b/be/src/exec/blocking-join-node.h @@ -161,6 +161,13 @@ class BlockingJoinNode : public ExecNode { join_op_ == TJoinOp::FULL_OUTER_JOIN; } + /// Returns true if the join needs to process unmatched probe rows, false otherwise. + bool NeedToProcessUnmatchedProbeRows() { + return join_op_ == TJoinOp::LEFT_OUTER_JOIN || join_op_ == TJoinOp::LEFT_ANTI_JOIN || + join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || + join_op_ == TJoinOp::FULL_OUTER_JOIN; + } + /// This function calculates the "local time" spent in the join node. /// /// The definition of "local time" is the wall clock time where this exec node is diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc index 87fbf62..5bccfa1 100644 --- a/be/src/exec/nested-loop-join-node.cc +++ b/be/src/exec/nested-loop-join-node.cc @@ -87,8 +87,14 @@ Status NestedLoopJoinNode::Open(RuntimeState* state) { RETURN_IF_ERROR(ResetMatchingBuildRows(state, build_batches_->total_num_rows())); } } - RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state)); - ResetForProbe(); + + if (build_batches_->total_num_rows() != 0 || NeedToProcessUnmatchedProbeRows()) { + RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state)); + ResetForProbe(); + } else { + // Skip processing probe input for empty build side if allowed by join mode. + if (!IsInSubplan()) child(0)->Close(state); + } return Status::OK(); } @@ -207,6 +213,11 @@ Status NestedLoopJoinNode::GetNext(RuntimeState* state, RowBatch* output_batch, RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); *eos = false; + if (build_batches_->total_num_rows() == 0 && !NeedToProcessUnmatchedProbeRows()) { + // Skip processing probe input for empty build side when allowed by join mode. + eos_ = true; + goto end; + } if (!HasValidProbeRow()) { RETURN_IF_ERROR(NextProbeRow(state, output_batch)); diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index a2b5001..b3bee4f 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -272,8 +272,13 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) { ExprContext::FreeLocalAllocations(probe_expr_ctxs_); RETURN_IF_ERROR(BlockingJoinNode::ConstructBuildAndOpenProbe(state, NULL)); - RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state)); - ResetForProbe(); + if (non_empty_build_ || NeedToProcessUnmatchedProbeRows()) { + RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state)); + ResetForProbe(); + } else { + // Skip processing probe input for empty build side when allowed by join mode. + if (!IsInSubplan()) child(0)->Close(state); + } DCHECK(null_aware_partition_ == NULL || join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN); return Status::OK(); } @@ -942,6 +947,10 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch if (ReachedLimit()) { *eos = true; return Status::OK(); + } else if (!non_empty_build_ && !NeedToProcessUnmatchedProbeRows()) { + // Skip processing probe input for empty build side if allowed by join mode. + *eos = true; + return Status::OK(); } else { *eos = false; } diff --git a/testdata/workloads/functional-query/queries/QueryTest/empty-build-joins.test b/testdata/workloads/functional-query/queries/QueryTest/empty-build-joins.test new file mode 100644 index 0000000..c2e7ec0 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/empty-build-joins.test @@ -0,0 +1,192 @@ +==== +---- QUERY +# Inner equi-join - executes with hash join. +select straight_join at.id +from alltypes at + inner join functional.alltypestiny att on at.id = att.id +where att.int_col = 999 +---- RESULTS +---- TYPES +INT +==== +---- QUERY +# Right equi-join - executes with hash join. +select straight_join at.id +from alltypes at + right join functional.alltypestiny att on at.id = att.id +where att.int_col = 999 +---- RESULTS +---- TYPES +INT +==== +---- QUERY +# Left equi-join - executes with hash join. +select straight_join at.id +from alltypes at + left join ( + select * from functional.alltypestiny where int_col = 999) att on at.id = att.id +order by at.id desc +limit 5 +---- RESULTS +7299 +7298 +7297 +7296 +7295 +---- TYPES +INT +==== +---- QUERY +# Full outer equi-join - executes with hash join. +select straight_join at.id +from alltypes at + full outer join ( + select * from functional.alltypestiny where int_col = 999) att on at.id = att.id +order by at.id desc +limit 5 +---- RESULTS +7299 +7298 +7297 +7296 +7295 +---- TYPES +INT +==== +---- QUERY +# Left semi equi-join - executes with hash join. +select straight_join at.id +from alltypes at +where id in ( + select id from functional.alltypestiny + where id = 999) +---- RESULTS +---- TYPES +INT +==== +---- QUERY +# Right semi equi-join - executes with hash join. +select straight_join at.id +from (select * from functional.alltypestiny att where int_col = 999) att + right semi join alltypes at on at.id = att.id +---- RESULTS +---- TYPES +INT +==== +---- QUERY +# Left NAAJ equi-join - executes with hash join. +select straight_join at.id +from alltypes at +where id not in ( + select id from functional.alltypestiny + where id = 999) +order by id desc +limit 5 +---- RESULTS +7299 +7298 +7297 +7296 +7295 +---- TYPES +INT +==== +---- QUERY +# Left anti equi-join - executes with hash join. +select straight_join at.id +from alltypes at +where not exists ( + select id from functional.alltypestiny att + where id = 999 and att.id = at.id) +order by id desc +limit 5 +---- RESULTS +7299 +7298 +7297 +7296 +7295 +---- TYPES +INT +==== +---- QUERY +# Right anti equi-join - executes with hash join. +select straight_join at.id +from (select * from functional.alltypestiny att where int_col = 999) att + right anti join alltypes at on at.id = att.id +order by at.id desc +limit 5 +---- RESULTS +7299 +7298 +7297 +7296 +7295 +---- TYPES +INT +==== +---- QUERY +# Inner non-equi-join - executes with nested loop join. +select straight_join at.id +from alltypes at + inner join functional.alltypestiny att on at.id < att.id +where att.int_col = 999 +---- RESULTS +---- TYPES +INT +==== +---- QUERY +# Cross join - executes with nested loop join. +select straight_join at.id +from alltypes at, functional.alltypestiny att +where att.int_col = 999 +---- RESULTS +---- TYPES +INT +==== +---- QUERY +# Left non-equi-join - executes with nested loop join. +select straight_join at.id +from alltypes at + left join ( + select * from functional.alltypestiny where int_col = 999) att on at.id < att.id +order by at.id desc +limit 5 +---- RESULTS +7299 +7298 +7297 +7296 +7295 +---- TYPES +INT +==== +---- QUERY +# Left semi non-equi-join - executes with nested loop join. +select straight_join at.id +from alltypes at + left semi join ( + select * from functional.alltypestiny att where int_col = 999) att on at.id < att.id +order by at.id desc +limit 5 +---- RESULTS +---- TYPES +INT +==== +---- QUERY +# Left anti non-equi-join - executes with nested loop join. +select straight_join at.id +from alltypes at LEFT ANTI JOIN ( + select * from functional.alltypestiny att + where id = 999) att on at.id < att.id +order by id desc +limit 5 +---- RESULTS +7299 +7298 +7297 +7296 +7295 +---- TYPES +INT +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test b/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test index bb22e16..0e5d2a2 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test +++ b/testdata/workloads/functional-query/queries/QueryTest/single-node-nlj.test @@ -160,3 +160,56 @@ left join functional.alltypes a2 on a2.tinyint_col >= 1 ---- TYPES BIGINT ==== +---- QUERY +# Right non-equi-join with empty build. +set num_nodes=1; +select straight_join at.id +from alltypes at + right join functional.alltypestiny att on at.id < att.id +where att.int_col = 999 +---- RESULTS +---- TYPES +INT +==== +---- QUERY +# Full outer non-equi-join with empty build. +select straight_join at.id +from alltypes at + full outer join ( + select * from functional.alltypestiny where int_col = 999) att on at.id < att.id +order by at.id desc +limit 5 +---- RESULTS +7299 +7298 +7297 +7296 +7295 +---- TYPES +INT +==== +---- QUERY +# Right semi non-equi-join with empty build. +select straight_join at.id +from (select * from functional.alltypestiny att where int_col = 999) att + right semi join alltypes at on at.id < att.id +---- RESULTS +---- TYPES +INT +==== +---- QUERY +# Right anti non-equi-join with empty build. +select straight_join at.id +from (select * from functional.alltypestiny att where int_col = 999) att + right anti join alltypes at on at.id < att.id +order by at.id desc +limit 5 +---- RESULTS +7299 +7298 +7297 +7296 +7295 +---- TYPES +INT +==== diff --git a/testdata/workloads/targeted-perf/queries/primitive_empty_build_join_1.test b/testdata/workloads/targeted-perf/queries/primitive_empty_build_join_1.test new file mode 100644 index 0000000..ef0a47f --- /dev/null +++ b/testdata/workloads/targeted-perf/queries/primitive_empty_build_join_1.test @@ -0,0 +1,13 @@ +==== +---- QUERY: primitive_empty_build_join_1 +-- Description : Join with empty build side and large probe side. +-- Target test case : Analytic query with selective filters where evaluation of +-- the join can be short-circuited for a dramatic speedup. +SELECT /* +straight_join */ * +FROM lineitem +INNER JOIN orders ON l_orderkey = o_orderkey +WHERE o_comment = 'no matching comments' +---- RESULTS +---- TYPES +==== + diff --git a/tests/query_test/test_join_queries.py b/tests/query_test/test_join_queries.py index 3acd97e..14965d8 100644 --- a/tests/query_test/test_join_queries.py +++ b/tests/query_test/test_join_queries.py @@ -86,6 +86,11 @@ class TestJoinQueries(ImpalaTestSuite): new_vector.get_value('exec_option')['num_nodes'] = 1 self.run_test_case('QueryTest/single-node-nlj-exhaustive', new_vector) + def test_empty_build_joins(self, vector): + new_vector = copy(vector) + new_vector.get_value('exec_option')['batch_size'] = vector.get_value('batch_size') + self.run_test_case('QueryTest/empty-build-joins', new_vector) + class TestTPCHJoinQueries(ImpalaTestSuite): # Uses the TPC-H dataset in order to have larger joins. Needed for example to test # the repartitioning codepaths. -- 2.5.0