Details

    • Type: New Feature New Feature
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.4.0
    • Component/s: None
    • Labels:
      None

      Description

      Implement the hash join operator. The first version of the hash join algorithm is a simple in memory implementation only and will not spill to disk

      Scope is to support inner, left outer, right outer and full outer joins for simple equality conditions.

      1. DRILL_505.patch
        76 kB
        Mehant Baid

        Activity

        Hide
        ASF GitHub Bot added a comment -

        GitHub user mehant opened a pull request:

        https://github.com/apache/incubator-drill/pull/49

        DRILL-505 Implement hash join operator

        Support for left outer, right outer and full joins
        Tests and multi batch outer join WIP

        Support for multiple join conditions
        Add following tests

        • Multiple condition join
        • Join on JSON scan
        • Multi batch join
        • Simple equality join

        Fix memory leaks

        Cleanup

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/mehant/incubator-drill DRILL-505_hash_join

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/incubator-drill/pull/49.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #49


        commit 1969f911a6a8d440e3b4eb7c580c2d53cf0d06c7
        Author: Mehant Baid <mehantr@gmail.com>
        Date: 2014-02-25T09:09:06Z

        DRILL-505 Hash Join basic changes

        Support for left outer, right outer and full joins
        Tests and multi batch outer join WIP

        Support for multiple join conditions
        Add following tests

        • Multiple condition join
        • Join on JSON scan
        • Multi batch join
        • Simple equality join

        Fix memory leaks

        Cleanup


        Show
        ASF GitHub Bot added a comment - GitHub user mehant opened a pull request: https://github.com/apache/incubator-drill/pull/49 DRILL-505 Implement hash join operator Support for left outer, right outer and full joins Tests and multi batch outer join WIP Support for multiple join conditions Add following tests Multiple condition join Join on JSON scan Multi batch join Simple equality join Fix memory leaks Cleanup You can merge this pull request into a Git repository by running: $ git pull https://github.com/mehant/incubator-drill DRILL-505 _hash_join Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-drill/pull/49.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #49 commit 1969f911a6a8d440e3b4eb7c580c2d53cf0d06c7 Author: Mehant Baid <mehantr@gmail.com> Date: 2014-02-25T09:09:06Z DRILL-505 Hash Join basic changes Support for left outer, right outer and full joins Tests and multi batch outer join WIP Support for multiple join conditions Add following tests Multiple condition join Join on JSON scan Multi batch join Simple equality join Fix memory leaks Cleanup
        Hide
        ASF GitHub Bot added a comment -

        Github user amansinha100 commented on the pull request:

        https://github.com/apache/incubator-drill/pull/49#issuecomment-40535783

        One high-level comment regarding the code organization: normally, we recommend keeping the operator's internal control flow and logic in the template class (in this case HashJoinTemplate) and only put the stuff that code generator needs in the 'batch' class. Perhaps we should discuss this ?

        Show
        ASF GitHub Bot added a comment - Github user amansinha100 commented on the pull request: https://github.com/apache/incubator-drill/pull/49#issuecomment-40535783 One high-level comment regarding the code organization: normally, we recommend keeping the operator's internal control flow and logic in the template class (in this case HashJoinTemplate) and only put the stuff that code generator needs in the 'batch' class. Perhaps we should discuss this ?
        Hide
        ASF GitHub Bot added a comment -

        Github user amansinha100 commented on a diff in the pull request:

        https://github.com/apache/incubator-drill/pull/49#discussion_r11693506

        — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java —
        @@ -0,0 +1,220 @@
        +/**
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.drill.exec.physical.impl.join;
        +
        +import java.util.ArrayList;
        +import java.util.BitSet;
        +import java.util.LinkedList;
        +import java.util.List;
        +
        +import io.netty.buffer.ByteBuf;
        +
        +import org.apache.drill.exec.exception.SchemaChangeException;
        +import org.apache.drill.exec.ops.FragmentContext;
        +import org.apache.drill.exec.record.selection.SelectionVector4;
        +import org.apache.drill.exec.physical.impl.common.HashTable;
        +
        +
        +/*
        + * Helper class for hash join. Keeps track of information about the build side batches.
        + *
        + * Hash join is a blocking operator, so we consume all the batches on the build side and
        + * store them in a hyper container. The way we can retrieve records from the hyper container
        + * is by providing the record index and batch index in the hyper container. When we invoke put()
        + * for a given row, hash table returns a global index. We store the current row's record index
        + * and batch index in this global index of the startIndices structure.
        + *
        + * Since there can be many rows with the same key on the build side, we store the first
        + * index in the startIndices list and the remaining are stored as a logical linked list using
        + * the 'links' field in the BuildInfo structures.
        + *
        + * Apart from the indexes into the hyper container, this class also stores information about
        + * which records of the build side had a matching record on the probe side. Stored in a bitvector
        + * keyMatchBitVector, it is used to retrieve all records that did not match a record on probe side
        + * for right outer and full outer joins
        + */
        +public class HashJoinHelper {
        +
        + /* List of start indexes. Stores the record and batch index of the first record
        + * with a give key.
        + */
        + public List<SelectionVector4> startIndices = new ArrayList<>();
        +
        + // List of BuildInfo structures. Used to maintain auxiliary information about the build batches
        + public List<BuildInfo> buildInfoList = new ArrayList<>();
        +
        + // Fragment context
        + public FragmentContext context;
        +
        + // Constant to indicate index is empty.
        + public static final int INDEX_EMPTY = -1;
        +
        + public HashJoinHelper(FragmentContext context)

        { + this.context = context; + }

        +
        + public void addStartIndexBatch() throws SchemaChangeException

        { + startIndices.add(getNewSV4(HashTable.BATCH_SIZE)); + }

        +
        + public class BuildInfo {
        + // List of links. Logically it helps maintain a linked list of records with the same key value
        + private SelectionVector4 links;
        +
        + // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side
        + private BitSet keyMatchBitVector;
        +
        + // number of records in this batch
        + private int recordCount;
        +
        + public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount)

        { + this.links = links; + this.keyMatchBitVector = keyMatchBitVector; + this.recordCount = recordCount; + }

        +
        + public SelectionVector4 getLinks()

        { + return links; + }

        +
        + public BitSet getKeyMatchBitVector()

        { + return keyMatchBitVector; + }

        + }
        +
        + public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
        +
        + ByteBuf vector = context.getAllocator().buffer((recordCount * 4));
        +
        + SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount);
        +
        + // Initialize the vector
        + for (int i = 0; i < recordCount; i++)

        { + sv4.set(i, INDEX_EMPTY); + }

        +
        + return sv4;
        + }
        +
        + public void addNewBatch(int recordCount) throws SchemaChangeException

        { + // Add a node to the list of BuildInfo's + BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount); + buildInfoList.add(info); + }

        +
        + public int getStartIndex(int keyIndex)

        { + int batchIdx = keyIndex / HashTable.BATCH_SIZE; + int offsetIdx = keyIndex % HashTable.BATCH_SIZE; + + assert batchIdx < startIndices.size(); + + SelectionVector4 sv4 = startIndices.get(batchIdx); + + return sv4.get(offsetIdx); + }

        +
        + public int getNextIndex(int currentIdx) {
        + // Get to the links field of the current index to get the next index
        + int batchIdx = currentIdx >>> 16;
        + int recordIdx = currentIdx & 65535;
        — End diff –

        Would be preferable to use a predefined constant such as HashTable.BATCH_MASK.

        Show
        ASF GitHub Bot added a comment - Github user amansinha100 commented on a diff in the pull request: https://github.com/apache/incubator-drill/pull/49#discussion_r11693506 — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java — @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.join; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.LinkedList; +import java.util.List; + +import io.netty.buffer.ByteBuf; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.physical.impl.common.HashTable; + + +/* + * Helper class for hash join. Keeps track of information about the build side batches. + * + * Hash join is a blocking operator, so we consume all the batches on the build side and + * store them in a hyper container. The way we can retrieve records from the hyper container + * is by providing the record index and batch index in the hyper container. When we invoke put() + * for a given row, hash table returns a global index. We store the current row's record index + * and batch index in this global index of the startIndices structure. + * + * Since there can be many rows with the same key on the build side, we store the first + * index in the startIndices list and the remaining are stored as a logical linked list using + * the 'links' field in the BuildInfo structures. + * + * Apart from the indexes into the hyper container, this class also stores information about + * which records of the build side had a matching record on the probe side. Stored in a bitvector + * keyMatchBitVector, it is used to retrieve all records that did not match a record on probe side + * for right outer and full outer joins + */ +public class HashJoinHelper { + + /* List of start indexes. Stores the record and batch index of the first record + * with a give key. + */ + public List<SelectionVector4> startIndices = new ArrayList<>(); + + // List of BuildInfo structures. Used to maintain auxiliary information about the build batches + public List<BuildInfo> buildInfoList = new ArrayList<>(); + + // Fragment context + public FragmentContext context; + + // Constant to indicate index is empty. + public static final int INDEX_EMPTY = -1; + + public HashJoinHelper(FragmentContext context) { + this.context = context; + } + + public void addStartIndexBatch() throws SchemaChangeException { + startIndices.add(getNewSV4(HashTable.BATCH_SIZE)); + } + + public class BuildInfo { + // List of links. Logically it helps maintain a linked list of records with the same key value + private SelectionVector4 links; + + // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side + private BitSet keyMatchBitVector; + + // number of records in this batch + private int recordCount; + + public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount) { + this.links = links; + this.keyMatchBitVector = keyMatchBitVector; + this.recordCount = recordCount; + } + + public SelectionVector4 getLinks() { + return links; + } + + public BitSet getKeyMatchBitVector() { + return keyMatchBitVector; + } + } + + public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException { + + ByteBuf vector = context.getAllocator().buffer((recordCount * 4)); + + SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount); + + // Initialize the vector + for (int i = 0; i < recordCount; i++) { + sv4.set(i, INDEX_EMPTY); + } + + return sv4; + } + + public void addNewBatch(int recordCount) throws SchemaChangeException { + // Add a node to the list of BuildInfo's + BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount); + buildInfoList.add(info); + } + + public int getStartIndex(int keyIndex) { + int batchIdx = keyIndex / HashTable.BATCH_SIZE; + int offsetIdx = keyIndex % HashTable.BATCH_SIZE; + + assert batchIdx < startIndices.size(); + + SelectionVector4 sv4 = startIndices.get(batchIdx); + + return sv4.get(offsetIdx); + } + + public int getNextIndex(int currentIdx) { + // Get to the links field of the current index to get the next index + int batchIdx = currentIdx >>> 16; + int recordIdx = currentIdx & 65535; — End diff – Would be preferable to use a predefined constant such as HashTable.BATCH_MASK.
        Hide
        ASF GitHub Bot added a comment -

        Github user amansinha100 commented on a diff in the pull request:

        https://github.com/apache/incubator-drill/pull/49#discussion_r11695133

        — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java —
        @@ -0,0 +1,220 @@
        +/**
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.drill.exec.physical.impl.join;
        +
        +import java.util.ArrayList;
        +import java.util.BitSet;
        +import java.util.LinkedList;
        +import java.util.List;
        +
        +import io.netty.buffer.ByteBuf;
        +
        +import org.apache.drill.exec.exception.SchemaChangeException;
        +import org.apache.drill.exec.ops.FragmentContext;
        +import org.apache.drill.exec.record.selection.SelectionVector4;
        +import org.apache.drill.exec.physical.impl.common.HashTable;
        +
        +
        +/*
        + * Helper class for hash join. Keeps track of information about the build side batches.
        + *
        + * Hash join is a blocking operator, so we consume all the batches on the build side and
        + * store them in a hyper container. The way we can retrieve records from the hyper container
        + * is by providing the record index and batch index in the hyper container. When we invoke put()
        + * for a given row, hash table returns a global index. We store the current row's record index
        + * and batch index in this global index of the startIndices structure.
        + *
        + * Since there can be many rows with the same key on the build side, we store the first
        + * index in the startIndices list and the remaining are stored as a logical linked list using
        + * the 'links' field in the BuildInfo structures.
        + *
        + * Apart from the indexes into the hyper container, this class also stores information about
        + * which records of the build side had a matching record on the probe side. Stored in a bitvector
        + * keyMatchBitVector, it is used to retrieve all records that did not match a record on probe side
        + * for right outer and full outer joins
        + */
        +public class HashJoinHelper {
        +
        + /* List of start indexes. Stores the record and batch index of the first record
        + * with a give key.
        + */
        + public List<SelectionVector4> startIndices = new ArrayList<>();
        +
        + // List of BuildInfo structures. Used to maintain auxiliary information about the build batches
        + public List<BuildInfo> buildInfoList = new ArrayList<>();
        +
        + // Fragment context
        + public FragmentContext context;
        +
        + // Constant to indicate index is empty.
        + public static final int INDEX_EMPTY = -1;
        +
        + public HashJoinHelper(FragmentContext context)

        { + this.context = context; + }

        +
        + public void addStartIndexBatch() throws SchemaChangeException

        { + startIndices.add(getNewSV4(HashTable.BATCH_SIZE)); + }

        +
        + public class BuildInfo {
        + // List of links. Logically it helps maintain a linked list of records with the same key value
        + private SelectionVector4 links;
        +
        + // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side
        + private BitSet keyMatchBitVector;
        +
        + // number of records in this batch
        + private int recordCount;
        +
        + public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount)

        { + this.links = links; + this.keyMatchBitVector = keyMatchBitVector; + this.recordCount = recordCount; + }

        +
        + public SelectionVector4 getLinks()

        { + return links; + }

        +
        + public BitSet getKeyMatchBitVector()

        { + return keyMatchBitVector; + }

        + }
        +
        + public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
        +
        + ByteBuf vector = context.getAllocator().buffer((recordCount * 4));
        +
        + SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount);
        +
        + // Initialize the vector
        + for (int i = 0; i < recordCount; i++)

        { + sv4.set(i, INDEX_EMPTY); + }

        +
        + return sv4;
        + }
        +
        + public void addNewBatch(int recordCount) throws SchemaChangeException

        { + // Add a node to the list of BuildInfo's + BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount); + buildInfoList.add(info); + }

        +
        + public int getStartIndex(int keyIndex)

        { + int batchIdx = keyIndex / HashTable.BATCH_SIZE; + int offsetIdx = keyIndex % HashTable.BATCH_SIZE; + + assert batchIdx < startIndices.size(); + + SelectionVector4 sv4 = startIndices.get(batchIdx); + + return sv4.get(offsetIdx); + }

        +
        + public int getNextIndex(int currentIdx)

        { + // Get to the links field of the current index to get the next index + int batchIdx = currentIdx >>> 16; + int recordIdx = currentIdx & 65535; + + assert batchIdx < buildInfoList.size(); + + // Get the corresponding BuildInfo node + BuildInfo info = buildInfoList.get(batchIdx); + return info.getLinks().get(recordIdx); + }

        +
        + public LinkedList<Integer> getNextUnmatchedIndex() {
        + LinkedList<Integer> compositeIndexes = new LinkedList<>();
        — End diff –

        In this and other places where you use LinkedList, do you really need it ? If you are only appending to the list, ArrayList should be sufficient and better performant, less memory footprint.

        Show
        ASF GitHub Bot added a comment - Github user amansinha100 commented on a diff in the pull request: https://github.com/apache/incubator-drill/pull/49#discussion_r11695133 — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java — @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.join; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.LinkedList; +import java.util.List; + +import io.netty.buffer.ByteBuf; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.physical.impl.common.HashTable; + + +/* + * Helper class for hash join. Keeps track of information about the build side batches. + * + * Hash join is a blocking operator, so we consume all the batches on the build side and + * store them in a hyper container. The way we can retrieve records from the hyper container + * is by providing the record index and batch index in the hyper container. When we invoke put() + * for a given row, hash table returns a global index. We store the current row's record index + * and batch index in this global index of the startIndices structure. + * + * Since there can be many rows with the same key on the build side, we store the first + * index in the startIndices list and the remaining are stored as a logical linked list using + * the 'links' field in the BuildInfo structures. + * + * Apart from the indexes into the hyper container, this class also stores information about + * which records of the build side had a matching record on the probe side. Stored in a bitvector + * keyMatchBitVector, it is used to retrieve all records that did not match a record on probe side + * for right outer and full outer joins + */ +public class HashJoinHelper { + + /* List of start indexes. Stores the record and batch index of the first record + * with a give key. + */ + public List<SelectionVector4> startIndices = new ArrayList<>(); + + // List of BuildInfo structures. Used to maintain auxiliary information about the build batches + public List<BuildInfo> buildInfoList = new ArrayList<>(); + + // Fragment context + public FragmentContext context; + + // Constant to indicate index is empty. + public static final int INDEX_EMPTY = -1; + + public HashJoinHelper(FragmentContext context) { + this.context = context; + } + + public void addStartIndexBatch() throws SchemaChangeException { + startIndices.add(getNewSV4(HashTable.BATCH_SIZE)); + } + + public class BuildInfo { + // List of links. Logically it helps maintain a linked list of records with the same key value + private SelectionVector4 links; + + // List of bitvectors. Keeps track of records on the build side that matched a record on the probe side + private BitSet keyMatchBitVector; + + // number of records in this batch + private int recordCount; + + public BuildInfo(SelectionVector4 links, BitSet keyMatchBitVector, int recordCount) { + this.links = links; + this.keyMatchBitVector = keyMatchBitVector; + this.recordCount = recordCount; + } + + public SelectionVector4 getLinks() { + return links; + } + + public BitSet getKeyMatchBitVector() { + return keyMatchBitVector; + } + } + + public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException { + + ByteBuf vector = context.getAllocator().buffer((recordCount * 4)); + + SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount); + + // Initialize the vector + for (int i = 0; i < recordCount; i++) { + sv4.set(i, INDEX_EMPTY); + } + + return sv4; + } + + public void addNewBatch(int recordCount) throws SchemaChangeException { + // Add a node to the list of BuildInfo's + BuildInfo info = new BuildInfo(getNewSV4(recordCount), new BitSet(recordCount), recordCount); + buildInfoList.add(info); + } + + public int getStartIndex(int keyIndex) { + int batchIdx = keyIndex / HashTable.BATCH_SIZE; + int offsetIdx = keyIndex % HashTable.BATCH_SIZE; + + assert batchIdx < startIndices.size(); + + SelectionVector4 sv4 = startIndices.get(batchIdx); + + return sv4.get(offsetIdx); + } + + public int getNextIndex(int currentIdx) { + // Get to the links field of the current index to get the next index + int batchIdx = currentIdx >>> 16; + int recordIdx = currentIdx & 65535; + + assert batchIdx < buildInfoList.size(); + + // Get the corresponding BuildInfo node + BuildInfo info = buildInfoList.get(batchIdx); + return info.getLinks().get(recordIdx); + } + + public LinkedList<Integer> getNextUnmatchedIndex() { + LinkedList<Integer> compositeIndexes = new LinkedList<>(); — End diff – In this and other places where you use LinkedList, do you really need it ? If you are only appending to the list, ArrayList should be sufficient and better performant, less memory footprint.
        Hide
        ASF GitHub Bot added a comment -

        Github user amansinha100 commented on a diff in the pull request:

        https://github.com/apache/incubator-drill/pull/49#discussion_r11695140

        — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java —
        @@ -0,0 +1,519 @@
        +/**
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.drill.exec.physical.impl.join;
        +
        +import org.eigenbase.rel.JoinRelType;
        +
        +import java.io.IOException;
        +import java.util.LinkedList;
        +
        +import com.sun.codemodel.JExpression;
        +import com.sun.codemodel.JVar;
        +import com.sun.codemodel.JExpr;
        +
        +import org.apache.drill.exec.compile.sig.GeneratorMapping;
        +import org.apache.drill.exec.compile.sig.MappingSet;
        +import org.apache.drill.exec.exception.ClassTransformationException;
        +import org.apache.drill.exec.exception.SchemaChangeException;
        +import org.apache.drill.exec.expr.ClassGenerator;
        +import org.apache.drill.exec.expr.CodeGenerator;
        +import org.apache.drill.exec.expr.TypeHelper;
        +import org.apache.drill.exec.expr.holders.IntHolder;
        +import org.apache.drill.exec.ops.FragmentContext;
        +import org.apache.drill.exec.physical.config.HashJoinPOP;
        +import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
        +import org.apache.drill.exec.physical.impl.common.HashTable;
        +import org.apache.drill.exec.physical.impl.common.HashTableConfig;
        +import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
        +import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
        +import org.apache.drill.exec.record.RecordBatch;
        +import org.apache.drill.exec.record.AbstractRecordBatch;
        +import org.apache.drill.exec.record.ExpandableHyperContainer;
        +import org.apache.drill.exec.record.BatchSchema;
        +import org.apache.drill.exec.record.VectorWrapper;
        +import org.apache.drill.exec.record.TypedFieldId;
        +import org.apache.drill.exec.vector.ValueVector;
        +import org.apache.drill.exec.vector.allocator.VectorAllocator;
        +
        +public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
        + // Probe side record batch
        + private final RecordBatch left;
        +
        + // Build side record batch
        + private final RecordBatch right;
        +
        + // Join type, INNER, LEFT, RIGHT or OUTER
        + private final JoinRelType joinType;
        +
        + // hash table configuration, created in HashJoinPOP
        + private HashTableConfig htConfig;
        +
        + // Runtime generated class implementing HashJoin interface
        + private HashJoin hashJoin = null;
        +
        + /* Helper class
        + * Maintains linked list of build side records with the same key
        + * Keeps information about which build records have a corresponding
        + * matching key in the probe side (for outer, right joins)
        + */
        + private HashJoinHelper hjHelper = null;
        +
        + // Underlying hashtable used by the hash join
        + private HashTable hashTable = null;
        +
        + /* Hyper container to store all build side record batches.
        + * Records are retrieved from this container when there is a matching record
        + * on the probe side
        + */
        + private ExpandableHyperContainer hyperContainer;
        +
        + // Number of records to process on the probe side
        + private int recordsToProcess;
        +
        + // Number of records processed on the probe side
        + private int recordsProcessed;
        +
        + // Number of records in the output container
        + private int outputRecords;
        +
        + // Current batch index on the build side
        + private int buildBatchIndex = 0;
        +
        + // Indicate if we should drain the next record from the probe side
        + private boolean getNextRecord = true;
        +
        + // Contains both batch idx and record idx of the matching record in the build side
        + private int currentCompositeIdx = -1;
        +
        + // Current state the hash join algorithm is in
        + private HashJoinState hjState = HashJoinState.BUILD;
        +
        + // For outer or right joins, this is a list of unmatched records that needs to be projected
        + private LinkedList<Integer> unmatchedBuildIndexes = null;
        +
        + // List of vector allocators
        + private LinkedList<VectorAllocator> allocators = null;
        +
        + // Schema of the build side
        + private BatchSchema rightSchema = null;
        +
        + // Generator mapping for the build side
        + private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
        + "projectBuildRecord" /* eval method */,
        + null /* reset /, null / cleanup */);
        +
        + // Generator mapping for the probe side
        + private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */,
        + "projectProbeRecord" /* eval method */,
        + null /* reset /, null / cleanup */);
        +
        + // Mapping set for the build side
        + private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index /, "outIndex" / write index */,
        + "buildBatch" /* read container */,
        + "outgoing" /* write container */,
        + PROJECT_BUILD, PROJECT_BUILD);
        +
        + // Mapping set for the probe side
        + private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index /, "outIndex" / write index */,
        + "probeBatch" /* read container */,
        + "outgoing" /* write container */,
        + PROJECT_PROBE, PROJECT_PROBE);
        +
        + // Possible states for the hash join
        + public static enum HashJoinState

        { + BUILD, // Build phase, go through the 'right' record batches and create the hash table + PROBE, // Go through the 'left' record batches, probe the hash table project the records if the key matches + PROJECT_RIGHT, // If its a RIGHT OUTER or FULL join go through the build records that didn't match a probe record and project them + DONE // Done processing all records get out + }

        +
        + @Override
        + public int getRecordCount()

        { + return outputRecords; + }

        +
        +
        + @Override
        + public IterOutcome next() {
        +
        + IterOutcome leftUpstream = IterOutcome.NONE;
        +
        + try {
        +
        + if (hjState == HashJoinState.BUILD) {
        +
        + // Initialize the hash join helper context
        + hjHelper = new HashJoinHelper(context);
        +
        + /* Build phase requires setting up the hash table. Hash table will
        + * materialize both the build and probe side expressions while
        + * creating the hash table. So we need to invoke next() on our probe batch
        + * as well, for the materialization to be successful. This batch will not be used
        + * till we complete the build phase.
        + */
        + left.next();
        — End diff –

        Does calling next() on the left (probe) side while we are in the build phase only need to be done once when we are starting the build phase ? Once we have the probe side expression for materialization, as long as the schema does not change, we don't need to call next() again on the left side I think. We should also think about whether there is an alternative way that maybe gives access to the expressions on the build side without the need to fetch the rows.

        Show
        ASF GitHub Bot added a comment - Github user amansinha100 commented on a diff in the pull request: https://github.com/apache/incubator-drill/pull/49#discussion_r11695140 — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java — @@ -0,0 +1,519 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import org.eigenbase.rel.JoinRelType; + +import java.io.IOException; +import java.util.LinkedList; + +import com.sun.codemodel.JExpression; +import com.sun.codemodel.JVar; +import com.sun.codemodel.JExpr; + +import org.apache.drill.exec.compile.sig.GeneratorMapping; +import org.apache.drill.exec.compile.sig.MappingSet; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.holders.IntHolder; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.impl.common.ChainedHashTable; +import org.apache.drill.exec.physical.impl.common.HashTable; +import org.apache.drill.exec.physical.impl.common.HashTableConfig; +import org.apache.drill.exec.physical.impl.sort.RecordBatchData; +import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.AbstractRecordBatch; +import org.apache.drill.exec.record.ExpandableHyperContainer; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.allocator.VectorAllocator; + +public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { + // Probe side record batch + private final RecordBatch left; + + // Build side record batch + private final RecordBatch right; + + // Join type, INNER, LEFT, RIGHT or OUTER + private final JoinRelType joinType; + + // hash table configuration, created in HashJoinPOP + private HashTableConfig htConfig; + + // Runtime generated class implementing HashJoin interface + private HashJoin hashJoin = null; + + /* Helper class + * Maintains linked list of build side records with the same key + * Keeps information about which build records have a corresponding + * matching key in the probe side (for outer, right joins) + */ + private HashJoinHelper hjHelper = null; + + // Underlying hashtable used by the hash join + private HashTable hashTable = null; + + /* Hyper container to store all build side record batches. + * Records are retrieved from this container when there is a matching record + * on the probe side + */ + private ExpandableHyperContainer hyperContainer; + + // Number of records to process on the probe side + private int recordsToProcess; + + // Number of records processed on the probe side + private int recordsProcessed; + + // Number of records in the output container + private int outputRecords; + + // Current batch index on the build side + private int buildBatchIndex = 0; + + // Indicate if we should drain the next record from the probe side + private boolean getNextRecord = true; + + // Contains both batch idx and record idx of the matching record in the build side + private int currentCompositeIdx = -1; + + // Current state the hash join algorithm is in + private HashJoinState hjState = HashJoinState.BUILD; + + // For outer or right joins, this is a list of unmatched records that needs to be projected + private LinkedList<Integer> unmatchedBuildIndexes = null; + + // List of vector allocators + private LinkedList<VectorAllocator> allocators = null; + + // Schema of the build side + private BatchSchema rightSchema = null; + + // Generator mapping for the build side + private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */, + "projectBuildRecord" /* eval method */, + null /* reset /, null / cleanup */); + + // Generator mapping for the probe side + private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */, + "projectProbeRecord" /* eval method */, + null /* reset /, null / cleanup */); + + // Mapping set for the build side + private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index /, "outIndex" / write index */, + "buildBatch" /* read container */, + "outgoing" /* write container */, + PROJECT_BUILD, PROJECT_BUILD); + + // Mapping set for the probe side + private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index /, "outIndex" / write index */, + "probeBatch" /* read container */, + "outgoing" /* write container */, + PROJECT_PROBE, PROJECT_PROBE); + + // Possible states for the hash join + public static enum HashJoinState { + BUILD, // Build phase, go through the 'right' record batches and create the hash table + PROBE, // Go through the 'left' record batches, probe the hash table project the records if the key matches + PROJECT_RIGHT, // If its a RIGHT OUTER or FULL join go through the build records that didn't match a probe record and project them + DONE // Done processing all records get out + } + + @Override + public int getRecordCount() { + return outputRecords; + } + + + @Override + public IterOutcome next() { + + IterOutcome leftUpstream = IterOutcome.NONE; + + try { + + if (hjState == HashJoinState.BUILD) { + + // Initialize the hash join helper context + hjHelper = new HashJoinHelper(context); + + /* Build phase requires setting up the hash table. Hash table will + * materialize both the build and probe side expressions while + * creating the hash table. So we need to invoke next() on our probe batch + * as well, for the materialization to be successful. This batch will not be used + * till we complete the build phase. + */ + left.next(); — End diff – Does calling next() on the left (probe) side while we are in the build phase only need to be done once when we are starting the build phase ? Once we have the probe side expression for materialization, as long as the schema does not change, we don't need to call next() again on the left side I think. We should also think about whether there is an alternative way that maybe gives access to the expressions on the build side without the need to fetch the rows.
        Hide
        ASF GitHub Bot added a comment -

        Github user mehant commented on the pull request:

        https://github.com/apache/incubator-drill/pull/49#issuecomment-40775688

        Moved the logic that invokes run time generated code into the template so it has a better chance of getting optimized.

        Addressed the other minor comments as well.

        Show
        ASF GitHub Bot added a comment - Github user mehant commented on the pull request: https://github.com/apache/incubator-drill/pull/49#issuecomment-40775688 Moved the logic that invokes run time generated code into the template so it has a better chance of getting optimized. Addressed the other minor comments as well.
        Hide
        ASF GitHub Bot added a comment -

        Github user mehant commented on a diff in the pull request:

        https://github.com/apache/incubator-drill/pull/49#discussion_r11759876

        — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java —
        @@ -0,0 +1,519 @@
        +/**
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +package org.apache.drill.exec.physical.impl.join;
        +
        +import org.eigenbase.rel.JoinRelType;
        +
        +import java.io.IOException;
        +import java.util.LinkedList;
        +
        +import com.sun.codemodel.JExpression;
        +import com.sun.codemodel.JVar;
        +import com.sun.codemodel.JExpr;
        +
        +import org.apache.drill.exec.compile.sig.GeneratorMapping;
        +import org.apache.drill.exec.compile.sig.MappingSet;
        +import org.apache.drill.exec.exception.ClassTransformationException;
        +import org.apache.drill.exec.exception.SchemaChangeException;
        +import org.apache.drill.exec.expr.ClassGenerator;
        +import org.apache.drill.exec.expr.CodeGenerator;
        +import org.apache.drill.exec.expr.TypeHelper;
        +import org.apache.drill.exec.expr.holders.IntHolder;
        +import org.apache.drill.exec.ops.FragmentContext;
        +import org.apache.drill.exec.physical.config.HashJoinPOP;
        +import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
        +import org.apache.drill.exec.physical.impl.common.HashTable;
        +import org.apache.drill.exec.physical.impl.common.HashTableConfig;
        +import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
        +import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
        +import org.apache.drill.exec.record.RecordBatch;
        +import org.apache.drill.exec.record.AbstractRecordBatch;
        +import org.apache.drill.exec.record.ExpandableHyperContainer;
        +import org.apache.drill.exec.record.BatchSchema;
        +import org.apache.drill.exec.record.VectorWrapper;
        +import org.apache.drill.exec.record.TypedFieldId;
        +import org.apache.drill.exec.vector.ValueVector;
        +import org.apache.drill.exec.vector.allocator.VectorAllocator;
        +
        +public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
        + // Probe side record batch
        + private final RecordBatch left;
        +
        + // Build side record batch
        + private final RecordBatch right;
        +
        + // Join type, INNER, LEFT, RIGHT or OUTER
        + private final JoinRelType joinType;
        +
        + // hash table configuration, created in HashJoinPOP
        + private HashTableConfig htConfig;
        +
        + // Runtime generated class implementing HashJoin interface
        + private HashJoin hashJoin = null;
        +
        + /* Helper class
        + * Maintains linked list of build side records with the same key
        + * Keeps information about which build records have a corresponding
        + * matching key in the probe side (for outer, right joins)
        + */
        + private HashJoinHelper hjHelper = null;
        +
        + // Underlying hashtable used by the hash join
        + private HashTable hashTable = null;
        +
        + /* Hyper container to store all build side record batches.
        + * Records are retrieved from this container when there is a matching record
        + * on the probe side
        + */
        + private ExpandableHyperContainer hyperContainer;
        +
        + // Number of records to process on the probe side
        + private int recordsToProcess;
        +
        + // Number of records processed on the probe side
        + private int recordsProcessed;
        +
        + // Number of records in the output container
        + private int outputRecords;
        +
        + // Current batch index on the build side
        + private int buildBatchIndex = 0;
        +
        + // Indicate if we should drain the next record from the probe side
        + private boolean getNextRecord = true;
        +
        + // Contains both batch idx and record idx of the matching record in the build side
        + private int currentCompositeIdx = -1;
        +
        + // Current state the hash join algorithm is in
        + private HashJoinState hjState = HashJoinState.BUILD;
        +
        + // For outer or right joins, this is a list of unmatched records that needs to be projected
        + private LinkedList<Integer> unmatchedBuildIndexes = null;
        +
        + // List of vector allocators
        + private LinkedList<VectorAllocator> allocators = null;
        +
        + // Schema of the build side
        + private BatchSchema rightSchema = null;
        +
        + // Generator mapping for the build side
        + private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
        + "projectBuildRecord" /* eval method */,
        + null /* reset /, null / cleanup */);
        +
        + // Generator mapping for the probe side
        + private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */,
        + "projectProbeRecord" /* eval method */,
        + null /* reset /, null / cleanup */);
        +
        + // Mapping set for the build side
        + private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index /, "outIndex" / write index */,
        + "buildBatch" /* read container */,
        + "outgoing" /* write container */,
        + PROJECT_BUILD, PROJECT_BUILD);
        +
        + // Mapping set for the probe side
        + private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index /, "outIndex" / write index */,
        + "probeBatch" /* read container */,
        + "outgoing" /* write container */,
        + PROJECT_PROBE, PROJECT_PROBE);
        +
        + // Possible states for the hash join
        + public static enum HashJoinState

        { + BUILD, // Build phase, go through the 'right' record batches and create the hash table + PROBE, // Go through the 'left' record batches, probe the hash table project the records if the key matches + PROJECT_RIGHT, // If its a RIGHT OUTER or FULL join go through the build records that didn't match a probe record and project them + DONE // Done processing all records get out + }

        +
        + @Override
        + public int getRecordCount()

        { + return outputRecords; + }

        +
        +
        + @Override
        + public IterOutcome next() {
        +
        + IterOutcome leftUpstream = IterOutcome.NONE;
        +
        + try {
        +
        + if (hjState == HashJoinState.BUILD) {
        +
        + // Initialize the hash join helper context
        + hjHelper = new HashJoinHelper(context);
        +
        + /* Build phase requires setting up the hash table. Hash table will
        + * materialize both the build and probe side expressions while
        + * creating the hash table. So we need to invoke next() on our probe batch
        + * as well, for the materialization to be successful. This batch will not be used
        + * till we complete the build phase.
        + */
        + left.next();
        — End diff –

        Yes, we call next() on the probe side exactly once before we complete the build phase.

        Show
        ASF GitHub Bot added a comment - Github user mehant commented on a diff in the pull request: https://github.com/apache/incubator-drill/pull/49#discussion_r11759876 — Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java — @@ -0,0 +1,519 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import org.eigenbase.rel.JoinRelType; + +import java.io.IOException; +import java.util.LinkedList; + +import com.sun.codemodel.JExpression; +import com.sun.codemodel.JVar; +import com.sun.codemodel.JExpr; + +import org.apache.drill.exec.compile.sig.GeneratorMapping; +import org.apache.drill.exec.compile.sig.MappingSet; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.holders.IntHolder; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.impl.common.ChainedHashTable; +import org.apache.drill.exec.physical.impl.common.HashTable; +import org.apache.drill.exec.physical.impl.common.HashTableConfig; +import org.apache.drill.exec.physical.impl.sort.RecordBatchData; +import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.AbstractRecordBatch; +import org.apache.drill.exec.record.ExpandableHyperContainer; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.allocator.VectorAllocator; + +public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { + // Probe side record batch + private final RecordBatch left; + + // Build side record batch + private final RecordBatch right; + + // Join type, INNER, LEFT, RIGHT or OUTER + private final JoinRelType joinType; + + // hash table configuration, created in HashJoinPOP + private HashTableConfig htConfig; + + // Runtime generated class implementing HashJoin interface + private HashJoin hashJoin = null; + + /* Helper class + * Maintains linked list of build side records with the same key + * Keeps information about which build records have a corresponding + * matching key in the probe side (for outer, right joins) + */ + private HashJoinHelper hjHelper = null; + + // Underlying hashtable used by the hash join + private HashTable hashTable = null; + + /* Hyper container to store all build side record batches. + * Records are retrieved from this container when there is a matching record + * on the probe side + */ + private ExpandableHyperContainer hyperContainer; + + // Number of records to process on the probe side + private int recordsToProcess; + + // Number of records processed on the probe side + private int recordsProcessed; + + // Number of records in the output container + private int outputRecords; + + // Current batch index on the build side + private int buildBatchIndex = 0; + + // Indicate if we should drain the next record from the probe side + private boolean getNextRecord = true; + + // Contains both batch idx and record idx of the matching record in the build side + private int currentCompositeIdx = -1; + + // Current state the hash join algorithm is in + private HashJoinState hjState = HashJoinState.BUILD; + + // For outer or right joins, this is a list of unmatched records that needs to be projected + private LinkedList<Integer> unmatchedBuildIndexes = null; + + // List of vector allocators + private LinkedList<VectorAllocator> allocators = null; + + // Schema of the build side + private BatchSchema rightSchema = null; + + // Generator mapping for the build side + private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */, + "projectBuildRecord" /* eval method */, + null /* reset /, null / cleanup */); + + // Generator mapping for the probe side + private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */, + "projectProbeRecord" /* eval method */, + null /* reset /, null / cleanup */); + + // Mapping set for the build side + private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index /, "outIndex" / write index */, + "buildBatch" /* read container */, + "outgoing" /* write container */, + PROJECT_BUILD, PROJECT_BUILD); + + // Mapping set for the probe side + private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index /, "outIndex" / write index */, + "probeBatch" /* read container */, + "outgoing" /* write container */, + PROJECT_PROBE, PROJECT_PROBE); + + // Possible states for the hash join + public static enum HashJoinState { + BUILD, // Build phase, go through the 'right' record batches and create the hash table + PROBE, // Go through the 'left' record batches, probe the hash table project the records if the key matches + PROJECT_RIGHT, // If its a RIGHT OUTER or FULL join go through the build records that didn't match a probe record and project them + DONE // Done processing all records get out + } + + @Override + public int getRecordCount() { + return outputRecords; + } + + + @Override + public IterOutcome next() { + + IterOutcome leftUpstream = IterOutcome.NONE; + + try { + + if (hjState == HashJoinState.BUILD) { + + // Initialize the hash join helper context + hjHelper = new HashJoinHelper(context); + + /* Build phase requires setting up the hash table. Hash table will + * materialize both the build and probe side expressions while + * creating the hash table. So we need to invoke next() on our probe batch + * as well, for the materialization to be successful. This batch will not be used + * till we complete the build phase. + */ + left.next(); — End diff – Yes, we call next() on the probe side exactly once before we complete the build phase.
        Hide
        Mehant Baid added a comment -

        Updated patch by moving logic that interacts with run time generated code into the template file.

        Show
        Mehant Baid added a comment - Updated patch by moving logic that interacts with run time generated code into the template file.
        Hide
        Jacques Nadeau added a comment -

        Merged in 1fc7b98

        Show
        Jacques Nadeau added a comment - Merged in 1fc7b98
        Hide
        ASF GitHub Bot added a comment -

        Github user adityakishore commented on the pull request:

        https://github.com/apache/incubator-drill/pull/49#issuecomment-40960496

        Merged as 1fc7b982414bc0dcd29b1d31e312d2207971933a. @mehant, please close this pull request.

        Show
        ASF GitHub Bot added a comment - Github user adityakishore commented on the pull request: https://github.com/apache/incubator-drill/pull/49#issuecomment-40960496 Merged as 1fc7b982414bc0dcd29b1d31e312d2207971933a. @mehant, please close this pull request.
        Hide
        ASF GitHub Bot added a comment -

        Github user mehant closed the pull request at:

        https://github.com/apache/incubator-drill/pull/49

        Show
        ASF GitHub Bot added a comment - Github user mehant closed the pull request at: https://github.com/apache/incubator-drill/pull/49

          People

          • Assignee:
            Mehant Baid
            Reporter:
            Mehant Baid
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development