Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/SortingClient.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/coprocessor/SortingClient.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/SortingClient.java (revision 0) @@ -0,0 +1,327 @@ +package org.apache.hadoop.hbase.client.coprocessor; + +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.coprocessor.SortingColumnInterpreter; +import org.apache.hadoop.hbase.coprocessor.SortingProtocol; + + + +/** + * This is client used for {@link SortingProtocol} + * + * @param DataType for the column used in Sorting + */ +public class SortingClient { + private static final Log log = LogFactory.getLog(SortingClient.class); + Configuration conf; + + public SortingClient(Configuration conf) { + this.conf = conf; + } + + /** + * @param table + * @param scan + * @param columnFamily + * @param columnQualifier + * @param colInterpreter + * @param startIndex + * @param pageSize + * @return + * @throws IOException + * @throws Throwable + */ + public Result[] sortIncreasing(HTable table, final Scan scan, final byte[] columnFamily, + final byte[] columnQualifier, final SortingColumnInterpreter colInterpreter, + final int startIndex, final int pageSize) throws IOException, Throwable { + validateRequest(scan, columnFamily, columnQualifier, startIndex, pageSize); + return getSortedRows(table, scan, columnFamily, columnQualifier, colInterpreter, startIndex, + pageSize, false); + + } + + /** + * @param table + * @param scan + * @param columnFamily + * @param columnQualifier + * @param colInterpreter + * @param startIndex + * @param pageSize + * @return + * @throws IOException + * @throws Throwable + */ + public Result[] sortDecreasing(HTable table, final Scan scan, final byte columnFamily[], + final byte[] columnQualifier, final SortingColumnInterpreter colInterpreter, + final int startIndex, final int pageSize) throws IOException, Throwable { + + validateRequest(scan, columnFamily, columnQualifier, startIndex, pageSize); + return getSortedRows(table, scan, columnFamily, columnQualifier, colInterpreter, startIndex, + pageSize, true); + + } + + /** + * @param table + * @param scan + * @param columnFamily + * @param columnQualifier + * @param colInterpreter + * @param startIndex + * @param pageSize + * @param sortDecreasing + * @return + * @throws IOException + * @throws Throwable + */ + private Result[] getSortedRows(HTable table, final Scan scan, final byte[] columnFamily, + final byte[] columnQualifier, final SortingColumnInterpreter colInterpreter, + final int startIndex, final int pageSize, final boolean sortDecreasing) throws IOException, + Throwable { + + int numberOfRegionsInRange = table.getRegionsInRange(scan.getStartRow(), scan.getStopRow()) + .size(); + if (numberOfRegionsInRange == 0) { + log.info("No Region found in range for:" + scan.getStartRow() + "," + scan.getStopRow()); + return new Result[0]; + } else if (numberOfRegionsInRange == 1) { + System.out.println("Querying only one region"); + Map regionResultMap = table.coprocessorExec(SortingProtocol.class, + scan.getStartRow(), scan.getStopRow(), new Batch.Call() { + @Override + public Result[] call(SortingProtocol instance) throws IOException { + if (sortDecreasing) return instance.sortDecreasing(scan, columnFamily, columnQualifier, + colInterpreter, startIndex, pageSize, true); + else return instance.sortIncreasing(scan, columnFamily, columnQualifier, + colInterpreter, startIndex, pageSize, true); + } + }); + + return regionResultMap.values().iterator().next(); + + } else // numberOfRegionsInRange > 1 + { + System.out.println("Querying multiple regions"); + Map regionResultMap = table.coprocessorExec(SortingProtocol.class, + scan.getStartRow(), scan.getStopRow(), new Batch.Call() { + @Override + public Result[] call(SortingProtocol instance) throws IOException { + if (sortDecreasing) return instance.sortDecreasing(scan, columnFamily, columnQualifier, + colInterpreter, startIndex, pageSize, false); + else return instance.sortIncreasing(scan, columnFamily, columnQualifier, + colInterpreter, startIndex, pageSize, false); + } + }); + + if (sortDecreasing) return mergeSortDecreasing(regionResultMap, startIndex, pageSize, + columnFamily, columnQualifier, colInterpreter); + else return mergeSortIncreasing(regionResultMap, startIndex, pageSize, columnFamily, + columnQualifier, colInterpreter); + } + } + + /** + * @param regionResultMap + * @param startIndex + * @param pageSize + * @param columnFamily + * @param columnQualifier + * @param colInterpreter + * @return + * @throws IOException + */ + private Result[] mergeSortDecreasing(Map regionResultMap, int startIndex, + int pageSize, byte[] columnFamily, byte[] columnQualifier, + SortingColumnInterpreter colInterpreter) throws IOException { + + Result[][] regionResults = new Result[regionResultMap.size()][]; + int[] arrayIndex = new int[regionResultMap.size()]; + Result[] finalResult = new Result[pageSize]; + T max = null; + T tmp = null; + int regionNum = 0; + int finalResultCurrentSize = 0; + int currentMaxValueRegion = 0; + int counter = 0; + boolean resultsExhausted = false; + for (Map.Entry regionResultsEntryMap : regionResultMap.entrySet()) { + regionResults[regionNum] = regionResultsEntryMap.getValue(); + regionNum++; + } + + // this is the initial rough draft for sorting the Rows when more than one region are queried. + while (counter < (pageSize + startIndex)) { + regionNum = 0; + currentMaxValueRegion = 0; + max = colInterpreter.getMinValue(); + resultsExhausted = true; + for (Result[] regionResult : regionResults) { + if ((regionResult.length - 1) == arrayIndex[regionNum]) { + continue; + } + resultsExhausted = false; + tmp = colInterpreter.getValue(regionResult[arrayIndex[regionNum]].getColumnLatest( + columnFamily, columnQualifier)); + + if (colInterpreter.compare(tmp, max) > 0) { + currentMaxValueRegion = regionNum; + max = tmp; + } + regionNum++; + } + counter++; + if (finalResultCurrentSize == finalResult.length) { + finalResultCurrentSize = 0; + } + if (!resultsExhausted) { + finalResult[finalResultCurrentSize++] = regionResults[currentMaxValueRegion][arrayIndex[currentMaxValueRegion]]; + arrayIndex[currentMaxValueRegion] = arrayIndex[currentMaxValueRegion] + 1; + } + } + + if (((startIndex + pageSize) % pageSize) != 0) { + return reAlignResults(startIndex, pageSize, finalResult); + } + return finalResult; + + } + + /** + * @param regionResultMap + * @param startIndex + * @param pageSize + * @param columnFamily + * @param columnQualifier + * @param colInterpreter + * @return + * @throws IOException + */ + private Result[] mergeSortIncreasing(Map regionResultMap, int startIndex, + int pageSize, byte[] columnFamily, byte[] columnQualifier, + SortingColumnInterpreter colInterpreter) throws IOException { + + Result[][] regionResults = new Result[regionResultMap.size()][]; + int[] arrayIndex = new int[regionResultMap.size()]; + Result[] finalResult = new Result[pageSize]; + T min = null; + T tmp = null; + int regionNum = 0; + int finalResultCurrentSize = 0; + int currentMaxValueRegion = 0; + + for (Map.Entry regionResultsEntryMap : regionResultMap.entrySet()) { + regionResults[regionNum] = regionResultsEntryMap.getValue(); + regionNum++; + } + + boolean resultsExhausted = false; + while (finalResultCurrentSize < pageSize && !resultsExhausted) { + regionNum = 0; + currentMaxValueRegion = 0; + min = colInterpreter.getMaxValue(); + resultsExhausted = true; + for (Result[] regionResult : regionResults) { + if ((regionResult.length - 1) > arrayIndex[regionNum]) { + continue; + } + resultsExhausted = false; + tmp = colInterpreter.getValue(regionResult[arrayIndex[regionNum]].getColumnLatest( + columnFamily, columnQualifier)); + + if (colInterpreter.compare(tmp, min) < 0) { + currentMaxValueRegion = regionNum; + min = tmp; + } + regionNum++; + } + if (!resultsExhausted) { + finalResult[finalResultCurrentSize++] = regionResults[currentMaxValueRegion][arrayIndex[currentMaxValueRegion]]; + arrayIndex[currentMaxValueRegion] = arrayIndex[currentMaxValueRegion] + 1; + } + } + if (((startIndex + pageSize) % pageSize) != 0 && finalResultCurrentSize == pageSize) { + return reAlignResults(startIndex, pageSize, finalResult); + } else if (((startIndex + finalResultCurrentSize) % pageSize) != 0) { + + } + return finalResult; + + } + + /** + * @param startIndex + * @param pageSize + * @param finalResult + * @return + */ + private Result[] reAlignResults(int startIndex, int pageSize, Result[] finalResult) { + int mod = ((startIndex + pageSize) % pageSize); + + Result[] tmpResult = new Result[pageSize]; + for (int i = 0; i < pageSize; i++) { + tmpResult[i] = finalResult[mod]; + if (mod == pageSize - 1) { + mod = 0; + } else { + mod++; + } + if (i == pageSize - 1) break; + } + return tmpResult; + + } + + /** + * @param scan + * @param columnFamily + * @param columnQualifier + * @param startIndex + * @param pageSize + * @return + */ + private boolean validateRequest(final Scan scan, final byte[] columnFamily, + final byte[] columnQualifier, final int startIndex, final int pageSize) { + if (startIndex < 0 && pageSize <= 0) { + log.info("Invalid request due to startIndex or pageSize"); + return false; + } + + if (!scan.getFamilyMap().isEmpty() && !scan.getFamilyMap().containsKey(columnFamily) + && !scan.getFamilyMap().get(columnFamily).contains(columnQualifier)) { + log.info("Scan does not contains the column family and column qualifier required for doing sorting" + + columnFamily + ":" + columnQualifier); + return false; + } + return true; + } + +} Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalSortingColumnInterpreter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalSortingColumnInterpreter.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalSortingColumnInterpreter.java (revision 0) @@ -0,0 +1,69 @@ +package org.apache.hadoop.hbase.client.coprocessor; + +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigDecimal; +import org.apache.hadoop.hbase.KeyValue; +import com.intuit.ihub.hbase.poc.coprocessor.endpoint.SortingColumnInterpreter; +import com.intuit.ihub.hbase.poc.util.BigDecimalUtils; + +/** + * This class is an implementation of {@link SortingColumnInterpreter} to sort on the basis BigDecimal columns + * + */ +public class BigDecimalSortingColumnInterpreter implements SortingColumnInterpreter { + + @Override + public void write(DataOutput out) throws IOException { + + } + + @Override + public void readFields(DataInput in) throws IOException { + + } + + @Override + public BigDecimal getValue(KeyValue kv) throws IOException { + if ((kv == null || kv.getValue() == null)) return null; + return BigDecimalUtils.valueOf(kv.getValue()); + } + + @Override + public BigDecimal getMaxValue() { + return BigDecimal.valueOf(Long.MAX_VALUE); + } + + @Override + public BigDecimal getMinValue() { + return BigDecimal.valueOf(Long.MIN_VALUE); + } + + @Override + public int compare(BigDecimal val1, BigDecimal val2) { + if ((((val1 == null) ? 1 : 0) ^ ((val2 == null) ? 1 : 0)) != 0) return ((val1 == null) ? -1 : 1); + if (val1 == null) return 0; + return val1.compareTo(val2); + } + +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/SortingProtocol.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/SortingProtocol.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/SortingProtocol.java (revision 0) @@ -0,0 +1,70 @@ +package org.apache.hadoop.hbase.coprocessor; + +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; + +/** + * This Endpoint interface defines the functions that can be used to Sort rows on the basis of cell values + * during Scanning. This EndPoint provides a functionality similar to "order by" clause of RDBMS. + * Insertion sort is used to sort the rows. + * + */ +public interface SortingProtocol extends CoprocessorProtocol { + + /** + * This method is used to sort rows in Increasing order. Insertion sort is used to sort the rows. + * + * @param scan scan object + * @param columnFamily column family of the column used for sorting + * @param columnQualifier column qualifier of the column used for sorting + * @param colInterpreter SortingColumnInterpreter that is used to compare the cell value + * @param startIndex startIndex(inclusive) + * @param pageSize number of rows to return in this query + * @param singleRegion does this scan request spans multiple regions? + * @return Result[], rows are sorted on the basis of a column in Increasing order + * @throws IOException + */ + Result[] sortIncreasing(Scan scan, byte[] columnFamily, byte[] columnQualifier, + SortingColumnInterpreter colInterpreter, int startIndex, int pageSize, + boolean singleRegion) throws IOException; + + /** + * This method is used to sort rows in Decreasing order. Insertion sort is used to sort the rows. + * + * @param scan Scan object + * @param columnFamily column family of the column used for sorting + * @param columnQualifier column qualifier of the column used for sorting + * @param colInterpreter SortingColumnInterpreter that is used to compare the cell value + * @param startIndex startIndex(inclusive) + * @param pageSize number of rows to return in this query + * @param singleRegion does this scan request spans multiple regions? + * @return Result[], rows are sorted on the basis of a column in Decreasing order + * @throws IOException + */ + Result[] sortDecreasing(Scan scan, byte[] columnFamily, byte[] columnQualifier, + SortingColumnInterpreter colInterpreter, int startIndex, int pageSize, + boolean singleRegion) throws IOException; + +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/SortingColumnInterpreter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/SortingColumnInterpreter.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/SortingColumnInterpreter.java (revision 0) @@ -0,0 +1,55 @@ +package org.apache.hadoop.hbase.coprocessor; + +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +import java.io.IOException; +import java.util.Comparator; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.io.Writable; + + +/** + * + * This interface defines how a cell value will be read, written, and compared while using the {@link SortingProtocol} + * @param DataType of the column used for sorting + */ +public interface SortingColumnInterpreter extends Writable, Comparator { + + + /** + * This method is used to get the cell value as T + * @param kv KeyValue that needs to be read + * @return value of cell as T + * @throws IOException + */ + T getValue(KeyValue kv) throws IOException; + + /** + * @return the maximum value for T + */ + T getMaxValue(); + + /** + * @return the Minimum value for T + */ + T getMinValue(); + +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/SortingProtocolImplementation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/SortingProtocolImplementation.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/SortingProtocolImplementation.java (revision 0) @@ -0,0 +1,218 @@ +package org.apache.hadoop.hbase.coprocessor; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This concrete class is the implementation of {@link SortingProtocol}. + */ +public class SortingProtocolImplementation extends BaseEndpointCoprocessor implements + SortingProtocol { + + protected static Log log = LogFactory.getLog(SortingProtocolImplementation.class); + + @Override + public Result[] sortIncreasing(Scan scan, byte[] columnFamily, byte[] columnQualifier, + SortingColumnInterpreter colInterpreter, int startIndex, int pageSize, boolean singleRegion) + throws IOException { + /** + * minValues, the Object[][] will store the List in Object[i][0] and value of column + * used for sorting in Object[i][1] + */ + Object[][] minValues = new Object[startIndex + pageSize][2]; + InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion() + .getScanner(scan); + List keyValues; + int noOfElementsInMinValues = 0; + try { + boolean hasMoreRows = false; + // loop until we run out of rows from scanner + do { + keyValues = new ArrayList(); + hasMoreRows = scanner.next(keyValues); + // loop through every keyValue within a Row to find the column used for sorting + for (KeyValue kv : keyValues) { + if ((byte) columnFamily.length == kv.getFamilyLength() + && columnQualifier.length == kv.getQualifierLength() + && Bytes.equals(columnFamily, 0, columnFamily.length, kv.getBuffer(), + kv.getFamilyOffset(), kv.getFamilyLength()) + && Bytes.equals(columnQualifier, 0, columnQualifier.length, kv.getBuffer(), + kv.getQualifierOffset(), kv.getQualifierLength())) { + T val = colInterpreter.getValue(kv); + List tmpKeyValues, tmpKeyValues1 = null; + T tmpValue, tmpValue1; + for (int i = 0; i < minValues.length; i++) { + if (i >= noOfElementsInMinValues) { + // array is not full yet. add a element at the tail of array + minValues[i][0] = keyValues; + minValues[i][1] = val; + noOfElementsInMinValues++; + break; + } + // smallest value will be at index 0 + if (colInterpreter.compare(val, ((T) minValues[i][1])) < 0) { + // current cell value is lesser than minValues[i][1], so shift all minValues[j] + // (where j>=i) element by one index to right. + tmpKeyValues = ((List) (minValues[i][0])); + tmpValue = ((T) minValues[i][1]); + minValues[i][0] = keyValues; + minValues[i][1] = val; + for (int j = i + 1; j < minValues.length; j++) { + if (j >= noOfElementsInMinValues) { + minValues[j][0] = tmpKeyValues; + minValues[j][1] = tmpValue; + noOfElementsInMinValues++; + break; + } + tmpKeyValues1 = ((List) (minValues[j][0])); + tmpValue1 = ((T) minValues[j][1]); + minValues[j][0] = tmpKeyValues; + minValues[j][1] = tmpValue; + tmpKeyValues = tmpKeyValues1; + tmpValue = tmpValue1; + } + break; + } + } + break; + } + } + } while (hasMoreRows); + } finally { + scanner.close(); + } + + return copyMaxValueRowsIntoResultArray(startIndex, pageSize, singleRegion, minValues, + noOfElementsInMinValues); + + } + + @Override + public Result[] sortDecreasing(Scan scan, byte[] columnFamily, byte[] columnQualifier, + SortingColumnInterpreter colInterpreter, int startIndex, int pageSize, boolean singleRegion) + throws IOException { + /** + * minValues, the Object[][] will store the List in Object[i][0] and value of column + * used for sorting in Object[i][1] + */ + Object[][] maxValues = new Object[startIndex + pageSize][2]; + InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion() + .getScanner(scan); + List keyValues; + int noOfElementsInMaxValues = 0; + try { + boolean hasMoreRows = false; + // loop until we run out of rows from scanner + do { + keyValues = new ArrayList(); + hasMoreRows = scanner.next(keyValues); + // loop through every keyValue within a Row to find the column used for sorting + for (KeyValue kv : keyValues) { + if ((byte) columnFamily.length == kv.getFamilyLength() + && columnQualifier.length == kv.getQualifierLength() + && Bytes.equals(columnFamily, 0, columnFamily.length, kv.getBuffer(), + kv.getFamilyOffset(), kv.getFamilyLength()) + && Bytes.equals(columnQualifier, 0, columnQualifier.length, kv.getBuffer(), + kv.getQualifierOffset(), kv.getQualifierLength())) { + T val = colInterpreter.getValue(kv); + List tmpKeyValues, tmpKeyValues1 = null; + T tmpValue, tmpValue1; + for (int i = 0; i < maxValues.length; i++) { + if (i >= noOfElementsInMaxValues) { + // array is not full yet. add a element at the tail of array + maxValues[i][0] = keyValues; + maxValues[i][1] = val; + noOfElementsInMaxValues++; + break; + } + // greatest value will be at index 0 + if (colInterpreter.compare(val, ((T) maxValues[i][1])) > 0) { + // current cell value is greater than maxValues[i][1], so shift all maxValues[j] + // (where j>=i) element by one index to right. + tmpKeyValues = ((List) (maxValues[i][0])); + tmpValue = ((T) maxValues[i][1]); + maxValues[i][0] = keyValues; + maxValues[i][1] = val; + for (int j = i + 1; j < maxValues.length; j++) { + if (j >= noOfElementsInMaxValues) { + maxValues[j][0] = tmpKeyValues; + maxValues[j][1] = tmpValue; + noOfElementsInMaxValues++; + break; + } + tmpKeyValues1 = ((List) (maxValues[j][0])); + tmpValue1 = ((T) maxValues[j][1]); + maxValues[j][0] = tmpKeyValues; + maxValues[j][1] = tmpValue; + tmpKeyValues = tmpKeyValues1; + tmpValue = tmpValue1; + + } + break; + } + } + break; + } + } + } while (hasMoreRows); + } finally { + scanner.close(); + } + return copyMaxValueRowsIntoResultArray(startIndex, pageSize, singleRegion, maxValues, + noOfElementsInMaxValues); + } + + /** + * This method creates the Result[] from the Object[][] that contains the sorted rows. + * @param startIndex + * @param pageSize + * @param singleRegion + * @param sortedValues + * @param noOfElementsInMaxValue + * @return + */ + private Result[] copyMaxValueRowsIntoResultArray(int startIndex, int pageSize, + boolean singleRegion, Object[][] sortedValues, int noOfElementsInMaxValue) { + Result[] finalResults; + long startTime = System.currentTimeMillis(); + if (singleRegion) { + if (noOfElementsInMaxValue < startIndex) { + return new Result[0]; + } + finalResults = new Result[pageSize]; + for (int i = startIndex; i < startIndex + pageSize && i < noOfElementsInMaxValue; i++) { + finalResults[i - startIndex] = new Result((List) sortedValues[i][0]); + } + } else { + finalResults = new Result[startIndex + pageSize]; + for (int i = 0; i < startIndex + pageSize && i < noOfElementsInMaxValue; i++) { + finalResults[i] = new Result((List) sortedValues[i][0]); + } + } + log.debug("Time taken for building result array:" + (System.currentTimeMillis() - startTime)); + return finalResults; + } + +}