Uploaded image for project: 'Phoenix'
  1. Phoenix
  2. PHOENIX-1779

Parallelize fetching of next batch of records for scans corresponding to queries with no order by

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 4.4.0
    • Labels:
      None

      Description

      Today in Phoenix we parallelize the first execution of scans i.e. we load only the first batch of records up to the scan's cache size in parallel. Loading of subsequent batches of records in scanners is essentially serial. This could be improved especially for queries, including the ones with no order by clauses, that do not need any kind of merge sort on the client. This could also potentially improve the performance of UPSERT SELECT statements that load data from one table and insert into another. One such use case being creating immutable indexes for tables that already have data. It could also potentially improve the performance of our MapReduce solution for bulk loading data by improving the speed of the loading/mapping phase.

      1. wip.patch
        36 kB
        Samarth Jain
      2. wipwithsplits.patch
        70 kB
        Samarth Jain
      3. wip3.patch
        72 kB
        Samarth Jain
      4. PHOENIX-1779.patch
        58 kB
        Samarth Jain
      5. PHOENIX-1779_v2.patch
        62 kB
        Samarth Jain
      6. PHOENIX-1779_v3.patch
        60 kB
        Samarth Jain

        Issue Links

          Activity

          Hide
          samarthjain Samarth Jain added a comment -

          Parking the work in progress patch. Need to address the test failures arising from switching the test-only default of forcing phoenix to returns rows in row key order from true to false.

          Show
          samarthjain Samarth Jain added a comment - Parking the work in progress patch. Need to address the test failures arising from switching the test-only default of forcing phoenix to returns rows in row key order from true to false.
          Hide
          jamestaylor James Taylor added a comment -

          Awesome, Samarth Jain. Apart from the test failures, does it function correctly? Would be interesting to use our performance.py script to get a rough idea about perf.

          Show
          jamestaylor James Taylor added a comment - Awesome, Samarth Jain . Apart from the test failures, does it function correctly? Would be interesting to use our performance.py script to get a rough idea about perf.
          Hide
          samarthjain Samarth Jain added a comment -

          I verified manually that queries and upsert selects are working fine. Of course that isn't in anyway sufficient. However, modifying the existing tests to handle this new condition where rows are not in the row key order anymore is turning out to be a HUGE pain . I thought I could simply use BaseTest#assertValuesEqualsResultSet to verify if that correct rows were returned in the result set but it turned out to be pretty limiting. That method essentially relies on object.equals() to verify if correct column values were returned which isn't always the right thing to do. For ex - rs.getLong() returns 0 although rs.getObject() returns null.

          I am going to give our perf.py script a shot now and see what gains are we looking at.

          Show
          samarthjain Samarth Jain added a comment - I verified manually that queries and upsert selects are working fine. Of course that isn't in anyway sufficient. However, modifying the existing tests to handle this new condition where rows are not in the row key order anymore is turning out to be a HUGE pain . I thought I could simply use BaseTest#assertValuesEqualsResultSet to verify if that correct rows were returned in the result set but it turned out to be pretty limiting. That method essentially relies on object.equals() to verify if correct column values were returned which isn't always the right thing to do. For ex - rs.getLong() returns 0 although rs.getObject() returns null. I am going to give our perf.py script a shot now and see what gains are we looking at.
          Hide
          samarthjain Samarth Jain added a comment -

          Parking the updated patch that handles split failures. All existing tests pass with the force_row_key_order config set to true.

          Show
          samarthjain Samarth Jain added a comment - Parking the updated patch that handles split failures. All existing tests pass with the force_row_key_order config set to true.
          Hide
          samarthjain Samarth Jain added a comment -

          Previous patches had a bug that was causing performance gains to come only from avoiding merge sort and wasn't really parallelizing loading batches as it should have.

          With the bug fixed, the performance gains look pretty impressive. For a 10 million row table spread over 2 regions on the same region server and 249 guideposts, following are the numbers:

          Reading out all the records from a table doing select * from T:

          Scanner caching - 100 which is also the hbase's default
          With patch - 22841 ms
          Without patch - 135282 ms
          Gain - 6x

          Scanner caching - 500
          With patch - 22030 ms
          Without patch - 99075 ms
          Gain - 4.5x

          Scanner caching - 1000:
          With patch - 20899 ms
          Without patch - 98899 ms
          Gain - 4.5 - 5x

          Scanner caching size - 2000
          With patch - 31000 ms
          Without patch - 88904 ms
          Gain - 2.5 - 3x

          Show
          samarthjain Samarth Jain added a comment - Previous patches had a bug that was causing performance gains to come only from avoiding merge sort and wasn't really parallelizing loading batches as it should have. With the bug fixed, the performance gains look pretty impressive. For a 10 million row table spread over 2 regions on the same region server and 249 guideposts, following are the numbers: Reading out all the records from a table doing select * from T: Scanner caching - 100 which is also the hbase's default With patch - 22841 ms Without patch - 135282 ms Gain - 6x Scanner caching - 500 With patch - 22030 ms Without patch - 99075 ms Gain - 4.5x Scanner caching - 1000: With patch - 20899 ms Without patch - 98899 ms Gain - 4.5 - 5x Scanner caching size - 2000 With patch - 31000 ms Without patch - 88904 ms Gain - 2.5 - 3x
          Hide
          jamestaylor James Taylor added a comment -

          Impressive! Nice work.

          Show
          jamestaylor James Taylor added a comment - Impressive! Nice work.
          Hide
          samarthjain Samarth Jain added a comment -

          Patch with tests.

          James Taylor - do you mind reviewing, please?

          Show
          samarthjain Samarth Jain added a comment - Patch with tests. James Taylor - do you mind reviewing, please?
          Hide
          jamestaylor James Taylor added a comment -

          Looking good, Samarth Jain. Here's some feedback and questions:

          Can you add a row count check to this test? I noticed your other test has that already:

          - +    @Test
          +    public void testUnionAllSelects() throws Exception {
          +        Set<String> keySetA = createTableAndInsertRows("TABLEA", 10, true, true);
          +        Set<String> keySetB = createTableAndInsertRows("TABLEB", 5, true, true);
          +        Set<String> keySetC = createTableAndInsertRows("TABLEC", 7, false, true);
          +        String query = "SELECT K FROM TABLEA UNION ALL SELECT K FROM TABLEB UNION ALL SELECT K FROM TABLEC";
          +        Connection conn = DriverManager.getConnection(getUrl());
          +        PreparedStatement stmt = conn.prepareStatement(query);
          +        stmt.setFetchSize(2); // force parallel fetch of scanner cache
          +        ResultSet rs = stmt.executeQuery();
          +        while (rs.next()) {
          +            String key = rs.getString(1);
          +            keySetA.remove(key);
          +            keySetB.remove(key);
          +            keySetC.remove(key);
          +        }
          +        assertEquals("Not all rows of tableA were returned", 0, keySetA.size());
          +        assertEquals("Not all rows of tableB were returned", 0, keySetB.size());
          +        assertEquals("Not all rows of tableC were returned", 0, keySetC.size());
          +    }
          +    
          

          In RoundRobinResultIterator, does the iterators.size() change as a result of a split having occurred (as otherwise it sounds like a race condition)? If that's correct, would you mind adding a comment to this effect?

          +                    // resize and replace the iterators list.
          +                    size = openIterators.size();
          +                    if (size > 0) {
          +                        iterators = getIterators();
          +                        // Possible that the number of iterators changed after the above call.
          +                        size = iterators.size();
          

          I don't think you need the Map<PeekingResultIterator, Integer> in RoundRobinResultIterators. You just need two parallel arrays: a PeekingResultIterator[] for the open iterators and an int[] with the number of records read for each open iterator. The index member variable will index into them. When an iterator is exhausted, you just remove that iterator from the PeekingResultIterator[] and remove the record read count from the int[].

          I don't think you need the PrefetchedRecordsIterator (you knew I'd find that instanceof check, didn't you? ). Do your fetchNextBatch() when a next() is done when numScannersCacheExhausted == openIterators.length. Then you can do the submit like you're doing, but just return the next Tuple for each one. You'll have a parallel Tuple[] member variable in that case. You can adjust your PeekingResultIterator[] and rowsRead[] based on a Tuple being null. Then add a counter member variable based on how many Tuples you have left. You'll count down the member variable in next() calls, not calling the underlying next() until the counter is zero.

          +        for (final PeekingResultIterator itr : iterators) {
          +            Future<Tuple> future = executor
          +                    .submit(new Callable<Tuple>() {
          +                        @Override
          +                        public Tuple call() throws Exception {
          +                            // Read the next record to refill the scanner's cache.
          +                            return itr.next();
          +                        }
          +                    });
          +            futures.add(future);
          

          Minor nit: can you rename QueryPlan.isRoundRobinPossible() to QueryPlan.useRoundRobinIterator()?

          Add a series of tests in QueryCompilerTest (or a new unit test) that are similar to QueryCompilerTest.testGroupByOrderPreserving() and testNotGroupByOrderPreserving() that confirm given a query whether or not QueryPlan.useRoundRobinIterator() is true or false.

          Show
          jamestaylor James Taylor added a comment - Looking good, Samarth Jain . Here's some feedback and questions: Can you add a row count check to this test? I noticed your other test has that already: - + @Test + public void testUnionAllSelects() throws Exception { + Set< String > keySetA = createTableAndInsertRows( "TABLEA" , 10, true , true ); + Set< String > keySetB = createTableAndInsertRows( "TABLEB" , 5, true , true ); + Set< String > keySetC = createTableAndInsertRows( "TABLEC" , 7, false , true ); + String query = "SELECT K FROM TABLEA UNION ALL SELECT K FROM TABLEB UNION ALL SELECT K FROM TABLEC" ; + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement stmt = conn.prepareStatement(query); + stmt.setFetchSize(2); // force parallel fetch of scanner cache + ResultSet rs = stmt.executeQuery(); + while (rs.next()) { + String key = rs.getString(1); + keySetA.remove(key); + keySetB.remove(key); + keySetC.remove(key); + } + assertEquals( "Not all rows of tableA were returned" , 0, keySetA.size()); + assertEquals( "Not all rows of tableB were returned" , 0, keySetB.size()); + assertEquals( "Not all rows of tableC were returned" , 0, keySetC.size()); + } + In RoundRobinResultIterator, does the iterators.size() change as a result of a split having occurred (as otherwise it sounds like a race condition)? If that's correct, would you mind adding a comment to this effect? + // resize and replace the iterators list. + size = openIterators.size(); + if (size > 0) { + iterators = getIterators(); + // Possible that the number of iterators changed after the above call. + size = iterators.size(); I don't think you need the Map<PeekingResultIterator, Integer> in RoundRobinResultIterators. You just need two parallel arrays: a PeekingResultIterator[] for the open iterators and an int[] with the number of records read for each open iterator. The index member variable will index into them. When an iterator is exhausted, you just remove that iterator from the PeekingResultIterator[] and remove the record read count from the int[]. I don't think you need the PrefetchedRecordsIterator (you knew I'd find that instanceof check, didn't you? ). Do your fetchNextBatch() when a next() is done when numScannersCacheExhausted == openIterators.length. Then you can do the submit like you're doing, but just return the next Tuple for each one. You'll have a parallel Tuple[] member variable in that case. You can adjust your PeekingResultIterator[] and rowsRead[] based on a Tuple being null. Then add a counter member variable based on how many Tuples you have left. You'll count down the member variable in next() calls, not calling the underlying next() until the counter is zero. + for ( final PeekingResultIterator itr : iterators) { + Future<Tuple> future = executor + .submit( new Callable<Tuple>() { + @Override + public Tuple call() throws Exception { + // Read the next record to refill the scanner's cache. + return itr.next(); + } + }); + futures.add( future ); Minor nit: can you rename QueryPlan.isRoundRobinPossible() to QueryPlan.useRoundRobinIterator()? Add a series of tests in QueryCompilerTest (or a new unit test) that are similar to QueryCompilerTest.testGroupByOrderPreserving() and testNotGroupByOrderPreserving() that confirm given a query whether or not QueryPlan.useRoundRobinIterator() is true or false.
          Hide
          samarthjain Samarth Jain added a comment -

          Can you add a row count check to this test? I noticed your other test has that already:

          Will do.

          In RoundRobinResultIterator, does the iterators.size() change as a result of a split having occurred (as otherwise it sounds like a race condition)?

          The size doesn't change because of splits. If splits happen before the start of query, then BaseResultIterators.getIterators() handles it for us. If splits happen after the query has started executing, HBase hides it from us.
          The below check is needed because on calling getIterators(), it is possible that we might have closed some iterators.

          +                    // resize and replace the iterators list.
          +                    size = openIterators.size();
          +                    if (size > 0) {
          +                        iterators = getIterators();
          +                        // Possible that the number of iterators changed after the above call.
          +                        size = iterators.size(); 
          

          I don't think you need the Map<PeekingResultIterator, Integer> in RoundRobinResultIterators. You just need two parallel arrays: a PeekingResultIterator[] for the open iterators and an int[] with the number of records read for each open iterator. The index member variable will index into them. When an iterator is exhausted, you just remove that iterator from the PeekingResultIterator[] and remove the record read count from the int[]

          Having two parallel arrays sounds more complicated that maintaing a map, IMHO. Gets and puts are relying on the address of PeekingResultIterator. So it is equally performant as compared to using arrays.

          I don't think you need the PrefetchedRecordsIterator.

          Agreed. Maintaing a separate Tuple array or list will be sufficient. There is almost always a way around instance of checks and I trusted you to come up with one .

          Will change the method name and add more tests in QueryCompilerTest.

          Show
          samarthjain Samarth Jain added a comment - Can you add a row count check to this test? I noticed your other test has that already: Will do. In RoundRobinResultIterator, does the iterators.size() change as a result of a split having occurred (as otherwise it sounds like a race condition)? The size doesn't change because of splits. If splits happen before the start of query, then BaseResultIterators.getIterators() handles it for us. If splits happen after the query has started executing, HBase hides it from us. The below check is needed because on calling getIterators(), it is possible that we might have closed some iterators. + // resize and replace the iterators list. + size = openIterators.size(); + if (size > 0) { + iterators = getIterators(); + // Possible that the number of iterators changed after the above call. + size = iterators.size(); I don't think you need the Map<PeekingResultIterator, Integer> in RoundRobinResultIterators. You just need two parallel arrays: a PeekingResultIterator[] for the open iterators and an int[] with the number of records read for each open iterator. The index member variable will index into them. When an iterator is exhausted, you just remove that iterator from the PeekingResultIterator[] and remove the record read count from the int[] Having two parallel arrays sounds more complicated that maintaing a map, IMHO. Gets and puts are relying on the address of PeekingResultIterator. So it is equally performant as compared to using arrays. I don't think you need the PrefetchedRecordsIterator. Agreed. Maintaing a separate Tuple array or list will be sufficient. There is almost always a way around instance of checks and I trusted you to come up with one . Will change the method name and add more tests in QueryCompilerTest.
          Hide
          jamestaylor James Taylor added a comment - - edited

          Having two parallel arrays sounds more complicated that maintaing a map, IMHO.

          But you don't need a map. You've got an index that will get you exactly what you need. It'd be like use a Map<Integer,Object> where the key of the Map is the index. Sure, it'll work to do a map.get(3) to get the fourth element, but so would an array[3] or a list.get(3). If you don't want to do parallel arrays, then do a List<Pair<PeekingResultIterator,Integer>> or maybe more clear a List<RoundRobinIteratorState> where RoundRobinIteratorState is a class with two member variables PeekingResultIterator iterator and int rowsRead.

          Show
          jamestaylor James Taylor added a comment - - edited Having two parallel arrays sounds more complicated that maintaing a map, IMHO. But you don't need a map. You've got an index that will get you exactly what you need. It'd be like use a Map<Integer,Object> where the key of the Map is the index. Sure, it'll work to do a map.get(3) to get the fourth element, but so would an array [3] or a list.get(3). If you don't want to do parallel arrays, then do a List<Pair<PeekingResultIterator,Integer>> or maybe more clear a List<RoundRobinIteratorState> where RoundRobinIteratorState is a class with two member variables PeekingResultIterator iterator and int rowsRead.
          Hide
          samarthjain Samarth Jain added a comment -

          Agreed on the index part. I realized that I have been using index by doing a new ArrayList<>(map.keySet).get(index).

          I like the idea of having a List of RoundRobinIteratorState. It is much better than having a list of Pair which is not always intuitive. Will make the change.

          Show
          samarthjain Samarth Jain added a comment - Agreed on the index part. I realized that I have been using index by doing a new ArrayList<>(map.keySet).get(index). I like the idea of having a List of RoundRobinIteratorState. It is much better than having a list of Pair which is not always intuitive. Will make the change.
          Hide
          samarthjain Samarth Jain added a comment -

          James Taylor - attached is the updated patch. Please review. Thanks!

          Show
          samarthjain Samarth Jain added a comment - James Taylor - attached is the updated patch. Please review. Thanks!
          Hide
          samarthjain Samarth Jain added a comment - - edited

          Perf numbers with the latest patch.

          select * from table with million rows and 16 salt buckets
          scanner cache size of 100

          With patch
          Average time ~ 1800 ms

          Without patch
          Average time ~ 13300 ms

          Perf gain ~ 7.5x

          3-way union all for tables with million rows and 16 salt buckets
          select * from tableA union all select * from tableB union all select * from tableC

          With Patch
          Average time ~ 11000 ms

          Without patch
          Average time ~ 35000 ms

          Perf gain ~ 3x

          There is more scope of improvement with Union All queries. With this patch we are only parallelizing fetching of next batches within each sub-select. In the above example we are fetching batches for 16 scanners in parallel. We could do better and parallelize fetching of batches for all the 48 scanners. That would get us closer to the perf gain that we were getting with regular single select queries.

          Show
          samarthjain Samarth Jain added a comment - - edited Perf numbers with the latest patch. select * from table with million rows and 16 salt buckets scanner cache size of 100 With patch Average time ~ 1800 ms Without patch Average time ~ 13300 ms Perf gain ~ 7.5x 3-way union all for tables with million rows and 16 salt buckets select * from tableA union all select * from tableB union all select * from tableC With Patch Average time ~ 11000 ms Without patch Average time ~ 35000 ms Perf gain ~ 3x There is more scope of improvement with Union All queries. With this patch we are only parallelizing fetching of next batches within each sub-select. In the above example we are fetching batches for 16 scanners in parallel. We could do better and parallelize fetching of batches for all the 48 scanners. That would get us closer to the perf gain that we were getting with regular single select queries.
          Hide
          jamestaylor James Taylor added a comment -

          Nice. Please file a follow up JIRA for the union all improvements.

          Show
          jamestaylor James Taylor added a comment - Nice. Please file a follow up JIRA for the union all improvements.
          Hide
          jamestaylor James Taylor added a comment -

          Thanks for the revisions, Samarth Jain. It's looking very good. The tests are fine, but I think the code can be tightened up a bit for the RoundRobinResultIterator:

          • add Tuple member variable to RoundRobinIteratorState and get rid of LinkedList<Tuple>.
          • return RoundRobinIteratorState for currentIterator() and List<RoundRobinIteratorState> for fetchNextBatch().
          • in next(), when currentIterator() returns, check state.getTuple() != null and return the Tuple if not null. Otherwise return state.iterator.next(). In either case, you increment state.recordsRead (seems like you have the potential for an off by one error, since you've read one row already with the initial next call).
          • when is this else case ever executed in getIterators(), as you initialize this in the constructor? Seems like you just need the if statement:
            +    private List<RoundRobinIteratorState> getIterators() throws SQLException {
            +        if (closed) { return Collections.emptyList(); }
            +        if (openIterators.size() > 0 && openIterators.size() == numScannersCacheExhausted) {
            +            /*
            +             * All the scanners have exhausted their cache. Submit the scanners back to the pool so that they can fetch
            +             * the next batch of records in parallel.
            +             */
            +            initOpenIterators(fetchNextBatch());
            +        } else if (openIterators.size() == 0 && resultIterators != null) {
            +            List<PeekingResultIterator> iterators = resultIterators.getIterators();
            +            initOpenIterators(iterators);
            +        }
            +        return openIterators;
            +    }
            +
            
          • I feel like the currentIterator() logic could be simplified a bit. For example, you don't need the else { break; }

            because the loop will terminate in those cases (as size == 0). Also, the first else isn't needed, as you return from the if. You can likely return a RoundRobinIteratorState.EMPTY constant if there are no more rows.

          Show
          jamestaylor James Taylor added a comment - Thanks for the revisions, Samarth Jain . It's looking very good. The tests are fine, but I think the code can be tightened up a bit for the RoundRobinResultIterator: add Tuple member variable to RoundRobinIteratorState and get rid of LinkedList<Tuple>. return RoundRobinIteratorState for currentIterator() and List<RoundRobinIteratorState> for fetchNextBatch(). in next(), when currentIterator() returns, check state.getTuple() != null and return the Tuple if not null. Otherwise return state.iterator.next(). In either case, you increment state.recordsRead (seems like you have the potential for an off by one error, since you've read one row already with the initial next call). when is this else case ever executed in getIterators(), as you initialize this in the constructor? Seems like you just need the if statement: + private List<RoundRobinIteratorState> getIterators() throws SQLException { + if (closed) { return Collections.emptyList(); } + if (openIterators.size() > 0 && openIterators.size() == numScannersCacheExhausted) { + /* + * All the scanners have exhausted their cache. Submit the scanners back to the pool so that they can fetch + * the next batch of records in parallel. + */ + initOpenIterators(fetchNextBatch()); + } else if (openIterators.size() == 0 && resultIterators != null ) { + List<PeekingResultIterator> iterators = resultIterators.getIterators(); + initOpenIterators(iterators); + } + return openIterators; + } + I feel like the currentIterator() logic could be simplified a bit. For example, you don't need the else { break; } because the loop will terminate in those cases (as size == 0). Also, the first else isn't needed, as you return from the if. You can likely return a RoundRobinIteratorState.EMPTY constant if there are no more rows.
          Hide
          jamestaylor James Taylor added a comment -

          I'd also get rid of currentIterator() and just combine it with next(). Maybe something like this:

          +    @Override
          +    public Tuple next() throws SQLException {
          +        List<RoundRobinIteratorState> iterators;
          +        while ((iterators = getIterators()).size() > 0) {
          +            index = index % size;
          +            RoundRobinIteratorState itrState = iterators.get(index);
          +            PeekingResultIterator itr = itrState.iterator;
          +            /*
          +             * Pick up the iterator only if it is open and if it hasn't already fetched more than the scanner cache size
          +             * of records.
          +             */
          +            if (itrState.numRecordsRead >= threshold) {
          +                index = (index + 1) % size;
          +            } else {
          +                Tuple tuple = null;
          +                if ((tuple = itrState.tuple) != null || (tuple = itr.peek()) != null) {
          +                    itrState.tuple = null;
          +                    itrState.numRecordsRead++;
          +                    index = (index + 1) % size;
          +                    if (itrState.numRecordsRead == threshold) {
          +                        numScannersCacheExhausted++;
          +                    }
          +                    return tuple;
          +                }
          +                // The scanner is exhausted and no more records will be returned by it. Un-track and close iterator.
          +                itr.close();
          +                iterators.remove(index);
          +        }
          +        return null;
          +    }
          
          Show
          jamestaylor James Taylor added a comment - I'd also get rid of currentIterator() and just combine it with next(). Maybe something like this: + @Override + public Tuple next() throws SQLException { + List<RoundRobinIteratorState> iterators; + while ((iterators = getIterators()).size() > 0) { + index = index % size; + RoundRobinIteratorState itrState = iterators.get(index); + PeekingResultIterator itr = itrState.iterator; + /* + * Pick up the iterator only if it is open and if it hasn't already fetched more than the scanner cache size + * of records. + */ + if (itrState.numRecordsRead >= threshold) { + index = (index + 1) % size; + } else { + Tuple tuple = null ; + if ((tuple = itrState.tuple) != null || (tuple = itr.peek()) != null ) { + itrState.tuple = null ; + itrState.numRecordsRead++; + index = (index + 1) % size; + if (itrState.numRecordsRead == threshold) { + numScannersCacheExhausted++; + } + return tuple; + } + // The scanner is exhausted and no more records will be returned by it. Un-track and close iterator. + itr.close(); + iterators.remove(index); + } + return null ; + }
          Hide
          samarthjain Samarth Jain added a comment -

          When is this else case ever executed in getIterators(), as you initialize this in the constructor? Seems like you just need the if statement:

          +    private List<RoundRobinIteratorState> getIterators() throws SQLException {
          +        if (closed) { return Collections.emptyList(); }
          +        if (openIterators.size() > 0 && openIterators.size() == numScannersCacheExhausted) {
          +            /*
          +             * All the scanners have exhausted their cache. Submit the scanners back to the pool so that they can fetch
          +             * the next batch of records in parallel.
          +             */
          +            initOpenIterators(fetchNextBatch());
          +        } else if (openIterators.size() == 0 && resultIterators != null) {
          +            List<PeekingResultIterator> iterators = resultIterators.getIterators();
          +            initOpenIterators(iterators);
          +        }
          +        return openIterators;
          +    }
          +
          

          We have two constructors:

          public RoundRobinResultIterator(ResultIterators iterators, QueryPlan plan) {
                  this.resultIterators = iterators;
                  this.plan = plan;
                  this.threshold = getThreshold();
          }
          
          public RoundRobinResultIterator(List<PeekingResultIterator> iterators, QueryPlan plan) {
                  this.resultIterators = null;
                  this.plan = plan;
                  this.threshold = getThreshold();
                  initOpenIterators(iterators);
          }
          

          The first one is called from ScanPlan and the second one from PhoenixRecordReader. The else block is used when the RoundRobinResultIterator is called from the ScanPlan. The idea (borrowed from ConcatResultIterator) is to call resultIterators.getIterators() only when needed.

          I feel like the currentIterator() logic could be simplified a bit.

          Let me see what I can do here. It would probably help to inline the code within next() itself like you suggested. That indirection isn't helping. Also like the suggestion of moving Tuple into RoundRobinIteratorState.

          Show
          samarthjain Samarth Jain added a comment - When is this else case ever executed in getIterators(), as you initialize this in the constructor? Seems like you just need the if statement: + private List<RoundRobinIteratorState> getIterators() throws SQLException { + if (closed) { return Collections.emptyList(); } + if (openIterators.size() > 0 && openIterators.size() == numScannersCacheExhausted) { + /* + * All the scanners have exhausted their cache. Submit the scanners back to the pool so that they can fetch + * the next batch of records in parallel. + */ + initOpenIterators(fetchNextBatch()); + } else if (openIterators.size() == 0 && resultIterators != null ) { + List<PeekingResultIterator> iterators = resultIterators.getIterators(); + initOpenIterators(iterators); + } + return openIterators; + } + We have two constructors: public RoundRobinResultIterator(ResultIterators iterators, QueryPlan plan) { this .resultIterators = iterators; this .plan = plan; this .threshold = getThreshold(); } public RoundRobinResultIterator(List<PeekingResultIterator> iterators, QueryPlan plan) { this .resultIterators = null ; this .plan = plan; this .threshold = getThreshold(); initOpenIterators(iterators); } The first one is called from ScanPlan and the second one from PhoenixRecordReader. The else block is used when the RoundRobinResultIterator is called from the ScanPlan. The idea (borrowed from ConcatResultIterator) is to call resultIterators.getIterators() only when needed. I feel like the currentIterator() logic could be simplified a bit. Let me see what I can do here. It would probably help to inline the code within next() itself like you suggested. That indirection isn't helping. Also like the suggestion of moving Tuple into RoundRobinIteratorState.
          Hide
          samarthjain Samarth Jain added a comment -

          Thanks for the review so far James Taylor. Based on your feedback, I have modified the patch. The changes are:
          1) Got rid of the currentIterator() method.
          2) Instead of having a RoundRobinIteratorState and mucking around with when to increase numRecordsRead, I have introduced a new inner class that delegates to the PeekingResultIterator and tracks numRecordsRead as well as the Tuple.

          Please review. Thanks!

          Show
          samarthjain Samarth Jain added a comment - Thanks for the review so far James Taylor . Based on your feedback, I have modified the patch. The changes are: 1) Got rid of the currentIterator() method. 2) Instead of having a RoundRobinIteratorState and mucking around with when to increase numRecordsRead, I have introduced a new inner class that delegates to the PeekingResultIterator and tracks numRecordsRead as well as the Tuple. Please review. Thanks!
          Hide
          jamestaylor James Taylor added a comment -

          That's very nice, Samarth Jain. +1 to 4.x and master. Great work!

          Show
          jamestaylor James Taylor added a comment - That's very nice, Samarth Jain . +1 to 4.x and master. Great work!
          Hide
          samarthjain Samarth Jain added a comment -

          Pushed to 4.4 and master branches. Thanks for the review James Taylor.

          Show
          samarthjain Samarth Jain added a comment - Pushed to 4.4 and master branches. Thanks for the review James Taylor .
          Hide
          hudson Hudson added a comment -

          SUCCESS: Integrated in Phoenix-master #690 (See https://builds.apache.org/job/Phoenix-master/690/)
          PHOENIX-1779 Parallelize fetching of next batch of records for scans corresponding to queries with no order by (samarth.jain: rev 8b1d7d9bc4e35630259c60d66bc7476f96273642)

          • phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
          • phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
          • phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
          • phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java
          • phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java
          • phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
          • phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
          • phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
          • phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
          • phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
          • phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java
          • phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
          • phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java
          • phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
          • phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java
          • phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
          • phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
          • phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java
          • phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
          • phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
          • phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
          • phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
          • phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
          • phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
          Show
          hudson Hudson added a comment - SUCCESS: Integrated in Phoenix-master #690 (See https://builds.apache.org/job/Phoenix-master/690/ ) PHOENIX-1779 Parallelize fetching of next batch of records for scans corresponding to queries with no order by (samarth.jain: rev 8b1d7d9bc4e35630259c60d66bc7476f96273642) phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanBigFilterTest.java phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipScanAfterManualSplitIT.java phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
          Hide
          hudson Hudson added a comment -

          SUCCESS: Integrated in Phoenix-master #692 (See https://builds.apache.org/job/Phoenix-master/692/)
          PHOENIX-1779 Addendum - Increase wait time for split to complete for RoundRobinResultIteratorIT (samarth.jain: rev b597ba817451564389f84a29d5e33a1f42d120a1)

          • phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
          Show
          hudson Hudson added a comment - SUCCESS: Integrated in Phoenix-master #692 (See https://builds.apache.org/job/Phoenix-master/692/ ) PHOENIX-1779 Addendum - Increase wait time for split to complete for RoundRobinResultIteratorIT (samarth.jain: rev b597ba817451564389f84a29d5e33a1f42d120a1) phoenix-core/src/it/java/org/apache/phoenix/iterate/RoundRobinResultIteratorIT.java
          Hide
          elilevine Eli Levine added a comment -

          Going through some code that uses row-value constructors got me thinking: How does the fact that rows are no longer guaranteed to be returned in rowkey order impact row-value constructors in Phoenix in general? At the end of http://phoenix.apache.org/paged.html we suggest the user grab values from the last row processed and use them in the next RVC call. After PHOENIX-1779 this is no longer guaranteed to work, right? Does the optimization for PHOENIX-1779 make sense with RVC at all? I see a few options:
          1. Force user to supply ORDER BY whenever they use RVCs. Seems pretty onerous.
          2. Don't do PHOENIX-1779's optimization in the presence of RVCs.
          3. Instruct user to use previous result's largest (or lowest, depending of PK sort order) PK value seen, instead of just grabbing values from last row to use in RVC. Also pretty onerous for users IMHO.

          Imagine this simple use case: somebody is writing code for paging over Phoenix results. The fist query does not use RVCs. Subsequent queries, if any, will use RVCs with values filled in based on previous results. Ideally, caller could tell Phoenix to "run this query in Phoenix with or without RVCs and return results in row-key order" because they want to use the results for paging and easily grab the last PK values to use for subsequent RVCs.

          Maybe the right thing to do is: (1) Force row-key ordered results in the presence of RVCs and (2) Allow users to pass in a query hint that forces ordered results for use in the first paged query with no RVCc.

          Samarth Jain, James Taylor, thoughts?

          CC Jan Fernando

          Show
          elilevine Eli Levine added a comment - Going through some code that uses row-value constructors got me thinking: How does the fact that rows are no longer guaranteed to be returned in rowkey order impact row-value constructors in Phoenix in general? At the end of http://phoenix.apache.org/paged.html we suggest the user grab values from the last row processed and use them in the next RVC call. After PHOENIX-1779 this is no longer guaranteed to work, right? Does the optimization for PHOENIX-1779 make sense with RVC at all? I see a few options: 1. Force user to supply ORDER BY whenever they use RVCs. Seems pretty onerous. 2. Don't do PHOENIX-1779 's optimization in the presence of RVCs. 3. Instruct user to use previous result's largest (or lowest, depending of PK sort order) PK value seen, instead of just grabbing values from last row to use in RVC. Also pretty onerous for users IMHO. Imagine this simple use case: somebody is writing code for paging over Phoenix results. The fist query does not use RVCs. Subsequent queries, if any, will use RVCs with values filled in based on previous results. Ideally, caller could tell Phoenix to "run this query in Phoenix with or without RVCs and return results in row-key order" because they want to use the results for paging and easily grab the last PK values to use for subsequent RVCs. Maybe the right thing to do is: (1) Force row-key ordered results in the presence of RVCs and (2) Allow users to pass in a query hint that forces ordered results for use in the first paged query with no RVCc. Samarth Jain , James Taylor , thoughts? CC Jan Fernando
          Hide
          jamestaylor James Taylor added a comment -

          For paged queries to work, you'd need to specify an ORDER BY clause as that example does (http://phoenix.apache.org/paged.html).

          Paged queries is just one use case for RVCs. There are many others. Basically with SQL, the optimization is independent of the query results. The basic rule is that a user needs to specify the intent of the query and realize that there's no guarantee for ordering if there's no ORDER BY. That's pretty standard with any RDBMS.

          Show
          jamestaylor James Taylor added a comment - For paged queries to work, you'd need to specify an ORDER BY clause as that example does ( http://phoenix.apache.org/paged.html ). Paged queries is just one use case for RVCs. There are many others. Basically with SQL, the optimization is independent of the query results. The basic rule is that a user needs to specify the intent of the query and realize that there's no guarantee for ordering if there's no ORDER BY. That's pretty standard with any RDBMS.
          Hide
          elilevine Eli Levine added a comment -

          I think you are right. If RVCs are used for purposes other than paging, and I think they will be, then users might not always want results ordered. Cool, no further concerns from me.

          Show
          elilevine Eli Levine added a comment - I think you are right. If RVCs are used for purposes other than paging, and I think they will be, then users might not always want results ordered. Cool, no further concerns from me.
          Hide
          enis Enis Soztutar added a comment -

          Bulk close of all issues that has been resolved in a released version.

          Show
          enis Enis Soztutar added a comment - Bulk close of all issues that has been resolved in a released version.

            People

            • Assignee:
              samarthjain Samarth Jain
              Reporter:
              samarthjain Samarth Jain
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development