Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/SortingClient.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/coprocessor/SortingClient.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/SortingClient.java (revision 0) @@ -0,0 +1,301 @@ +package org.apache.hadoop.hbase.client.coprocessor; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.SortingColumnInterpreter; +import org.apache.hadoop.hbase.coprocessor.SortingProtocol; + +/** + * This is client used for {@link SortingProtocol} + * @param DataType for the column used in Sorting + */ +public class SortingClient { + private static final Log log = LogFactory.getLog(SortingClient.class); + + /** + * This method is used when rows are sorted in Increasing(Ascending) order. + * @param table instance of HTable + * @param scan Scan object that contains filter and other scan related settings + * @param columnFamily Name of column family + * @param columnQualifier Name of column Qualifier on the basis of sorting will be performed + * @param colInterpreter {@link SortingColumnInterpreter} used while sorting + * @param startIndex No. of rows to skip from the beginning(startIndex is included in output) + * @param pageSize Number of rows to return in this request + * @return Result array that contains rows in Increasing order + * @throws IOException + * @throws Throwable + */ + public Result[] sortIncreasing(HTable table, final Scan scan, final byte[] columnFamily, + final byte[] columnQualifier, final SortingColumnInterpreter colInterpreter, + final int startIndex, final int pageSize) throws IOException, Throwable { + if(!validateRequest(table, scan, columnFamily, columnQualifier, startIndex, pageSize)) + { + return null; + } + return getSortedRows(table, scan, columnFamily, columnQualifier, colInterpreter, startIndex, + pageSize, false); + } + + /** + * This method is used when rows are sorted in Decreasing(Descending) order. + * @param table instance of HTable + * @param scan Scan object that contains filter and other scan related settings + * @param columnFamily Name of column family + * @param columnQualifier Name of column Qualifier on the basis of sorting will be performed + * @param colInterpreter {@link SortingColumnInterpreter} used while sorting + * @param startIndex No. of rows to skip from the beginning(startIndex is included in output) + * @param pageSize Number of rows to return in this request + * @return Result array that contains rows in Decreasing order + * @throws IOException + * @throws Throwable + */ + public Result[] sortDecreasing(HTable table, final Scan scan, final byte columnFamily[], + final byte[] columnQualifier, final SortingColumnInterpreter colInterpreter, + final int startIndex, final int pageSize) throws IOException, Throwable { + + if(!validateRequest(table, scan, columnFamily, columnQualifier, startIndex, pageSize)) + { + return null; + } + return getSortedRows(table, scan, columnFamily, columnQualifier, colInterpreter, startIndex, + pageSize, true); + } + + /** + * This method is used to call the coprocessor. + * @param table instance of HTable + * @param scan Scan object that contains filter and other scan related settings + * @param columnFamily Name of column family + * @param columnQualifier Name of column Qualifier on the basis of sorting will be performed + * @param colInterpreter {@link SortingColumnInterpreter} used while sorting + * @param startIndex No. of rows to skip from the beginning(startIndex is included in output) + * @param pageSize Number of rows to return in this request + * @param sortDecreasing Whether sorting in Decreasing order or not? + * @return Result[] that contains the sorted rows + * @throws IOException + * @throws Throwable + */ + private Result[] getSortedRows(HTable table, final Scan scan, final byte[] columnFamily, + final byte[] columnQualifier, final SortingColumnInterpreter colInterpreter, + final int startIndex, final int pageSize, final boolean sortDecreasing) throws IOException, + Throwable { + int numberOfRegionsInRange = table.getRegionsInRange(scan.getStartRow(), scan.getStopRow()) + .size(); + if (numberOfRegionsInRange == 0) { + log.info("No Region found in range for:" + scan.getStartRow() + "," + scan.getStopRow()); + return new Result[0]; + } else if (numberOfRegionsInRange == 1) { + log.info("Querying only one region for sorting"); + Map regionResultMap = table.coprocessorExec(SortingProtocol.class, + scan.getStartRow(), scan.getStopRow(), new Batch.Call() { + @Override + public Result[] call(SortingProtocol instance) throws IOException { + if (sortDecreasing) return instance.sortDecreasing(scan, columnFamily, columnQualifier, + colInterpreter, startIndex, pageSize, true); + else return instance.sortIncreasing(scan, columnFamily, columnQualifier, + colInterpreter, startIndex, pageSize, true); + } + }); + return regionResultMap.values().iterator().next(); + } else // numberOfRegionsInRange > 1 + { + log.info("Querying multiple regions for sorting"); + Map regionResultMap = table.coprocessorExec(SortingProtocol.class, + scan.getStartRow(), scan.getStopRow(), new Batch.Call() { + @Override + public Result[] call(SortingProtocol instance) throws IOException { + if (sortDecreasing) return instance.sortDecreasing(scan, columnFamily, columnQualifier, + colInterpreter, startIndex, pageSize, false); + else return instance.sortIncreasing(scan, columnFamily, columnQualifier, + colInterpreter, startIndex, pageSize, false); + } + }); + return mergeSort(regionResultMap, startIndex, pageSize, columnFamily, columnQualifier, + colInterpreter, sortDecreasing); + } + } + + /** + * This method is used to do the merge sort the rows from multiple regions and produce the final output + * @param regionResultMap map that contains the region key and Result[] + * @param startIndex No. of rows to skip from the beginning(startIndex is included in output) + * @param pageSize Number of rows to return in this request + * @param columnFamily Name of column family + * @param columnQualifier Name of column Qualifier on the basis of sorting will be performed + * @param colInterpreter {@link SortingColumnInterpreter} used while sorting + * @param sortDecreasing Whether sorting in Decreasing order or not? + * @return Result[] for final output + * @throws IOException + */ + private Result[] mergeSort(Map regionResultMap, int startIndex, int pageSize, + byte[] columnFamily, byte[] columnQualifier, SortingColumnInterpreter colInterpreter, + boolean sortDecreasing) throws IOException { + Result[][] regionResults = new Result[regionResultMap.size()][]; + int regionNum = 0; + int totalNoOfRows=0; + for (Map.Entry regionResultsEntryMap : regionResultMap.entrySet()) { + regionResults[regionNum] = regionResultsEntryMap.getValue(); + totalNoOfRows+=regionResultsEntryMap.getValue().length; + regionNum++; + } + if(totalNoOfRows < startIndex) + { + return new Result[0]; + } + Result[] finalResult = new Result[Math.min(pageSize, totalNoOfRows - (startIndex-1))]; + // arrayIndex is used to store the current index of Result array of regions while doing merge + // sort. + int[] arrayIndex = new int[regionResultMap.size()]; + boolean resultsExhausted = false; + int finalResultCurrentSize = 0; + int currentMaxorMinValueRegion = 0; + int counter = 0; + T maxOrMin = null; + T tmp = null; + if (sortDecreasing) { + while (counter < (finalResult.length + startIndex)) { + regionNum = 0; + currentMaxorMinValueRegion = 0; + maxOrMin = colInterpreter.getMinValue(); + resultsExhausted = true; + for (Result[] regionResult : regionResults) { + if ((regionResult.length - 1) < arrayIndex[regionNum]) { + continue; + } + resultsExhausted = false; + tmp = colInterpreter.getValue(regionResult[arrayIndex[regionNum]].getColumnLatest( + columnFamily, columnQualifier)); + + if (colInterpreter.compare(tmp, maxOrMin) > 0) { + currentMaxorMinValueRegion = regionNum; + maxOrMin = tmp; + } + regionNum++; + } + counter++; + if (finalResultCurrentSize == finalResult.length) { + finalResultCurrentSize = 0; + } + if (!resultsExhausted) { + finalResult[finalResultCurrentSize++] = regionResults[currentMaxorMinValueRegion][arrayIndex[currentMaxorMinValueRegion]]; + arrayIndex[currentMaxorMinValueRegion] = arrayIndex[currentMaxorMinValueRegion] + 1; + } + } + // mergesort Increasing + } else { + while (counter < (finalResult.length + startIndex)) { + regionNum = 0; + currentMaxorMinValueRegion = 0; + maxOrMin = colInterpreter.getMaxValue(); + resultsExhausted = true; + for (Result[] regionResult : regionResults) { + if ((regionResult.length - 1) < arrayIndex[regionNum]) { + continue; + } + resultsExhausted = false; + tmp = colInterpreter.getValue(regionResult[arrayIndex[regionNum]].getColumnLatest( + columnFamily, columnQualifier)); + + if (colInterpreter.compare(tmp, maxOrMin) < 0) { + currentMaxorMinValueRegion = regionNum; + maxOrMin = tmp; + } + regionNum++; + } + counter++; + if (finalResultCurrentSize == finalResult.length) { + finalResultCurrentSize = 0; + } + if (!resultsExhausted) { + finalResult[finalResultCurrentSize++] = regionResults[currentMaxorMinValueRegion][arrayIndex[currentMaxorMinValueRegion]]; + arrayIndex[currentMaxorMinValueRegion] = arrayIndex[currentMaxorMinValueRegion] + 1; + } + } + } + if (((startIndex + finalResult.length) % finalResult.length) != 0) { + return reAlignResults(startIndex, finalResult.length, finalResult); + } + return finalResult; + } + + /** + * This method is used to shift the element in Result[] so that the minimum or maximum row is at + * the Result[0] in case of Increasing and Decreasing order sort respectively. + * @param startIndex No. of rows to skip from the beginning(startIndex is included in output) + * @param pageSize Number of rows to return in this request + * @param results array that contains the unaligned output + * @return Result[] with correctly aligned elements + */ + private Result[] reAlignResults(int startIndex, int pageSize, Result[] results) { + int mod = ((startIndex + pageSize) % pageSize); + + Result[] tmpResult = new Result[pageSize]; + for (int i = 0; i < pageSize; i++) { + tmpResult[i] = results[mod]; + if (mod == pageSize - 1) { + mod = 0; + } else { + mod++; + } + if (i == pageSize - 1) break; + } + return tmpResult; + } + + /** + * This method is used to validate the sorting request at the client side + * @param scan Scan object that contains filter and other scan related settings + * @param columnFamily Name of column family + * @param columnQualifier Name of column Qualifier on the basis of sorting will be performed + * @param colInterpreter {@link SortingColumnInterpreter} used while sorting + * @param startIndex No. of rows to skip from the beginning(startIndex is included in output) + * @param pageSize Number of rows to return in this request + * @return true if the request is valid. + */ + private boolean validateRequest(final HTable table, final Scan scan, final byte[] columnFamily, + final byte[] columnQualifier, final int startIndex, final int pageSize) { + if(table == null) + { + log.error("Htable is null"); + return false; + } + if(columnFamily == null || columnQualifier == null) + { + log.error("Column Family or Column Qualifier is null"); + return false; + } + if (startIndex < 0 && pageSize <= 0) { + log.error("Invalid request due to startIndex or pageSize"); + return false; + } + if(scan == null) + { + log.error("Scan is null"); + return false; + } + if (!scan.getFamilyMap().isEmpty() && !scan.getFamilyMap().containsKey(columnFamily) + && !scan.getFamilyMap().get(columnFamily).contains(columnQualifier)) { + log.error("Scan does not contains the column family and column qualifier required for doing sorting" + + columnFamily + ":" + columnQualifier); + return false; + } + return true; + } +} Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalSortingColumnInterpreter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalSortingColumnInterpreter.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalSortingColumnInterpreter.java (revision 0) @@ -0,0 +1,60 @@ +package org.apache.hadoop.hbase.client.coprocessor; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.coprocessor.SortingColumnInterpreter; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This class is an implementation of {@link SortingColumnInterpreter} to sort on the basis + * BigDecimal columns + */ +public class BigDecimalSortingColumnInterpreter implements SortingColumnInterpreter { + + @Override + public void write(DataOutput out) throws IOException { + } + + @Override + public void readFields(DataInput in) throws IOException { + } + + @Override + public BigDecimal getValue(KeyValue kv) throws IOException { + if ((kv == null || kv.getValue() == null)) return null; + return Bytes.toBigDecimal(kv.getValue()).setScale(2, RoundingMode.HALF_EVEN); + } + + @Override + public BigDecimal getMaxValue() { + return BigDecimal.valueOf(Long.MAX_VALUE); + } + + @Override + public BigDecimal getMinValue() { + return BigDecimal.valueOf(Long.MIN_VALUE); + } + + @Override + public int compare(BigDecimal val1, BigDecimal val2) { + if ((((val1 == null) ? 1 : 0) ^ ((val2 == null) ? 1 : 0)) != 0) return ((val1 == null) ? -1 : 1); + if (val1 == null) return 0; + return val1.compareTo(val2); + } +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/SortingProtocol.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/SortingProtocol.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/SortingProtocol.java (revision 0) @@ -0,0 +1,70 @@ +package org.apache.hadoop.hbase.coprocessor; + +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; + +/** + * This Endpoint interface defines the functions that can be used to Sort rows on the basis of cell values + * during Scanning. This EndPoint provides a functionality similar to "order by" clause of RDBMS. + * Insertion sort is used to sort the rows. + * + */ +public interface SortingProtocol extends CoprocessorProtocol { + + /** + * This method is used to sort rows in Increasing order. Insertion sort is used to sort the rows. + * + * @param scan scan object + * @param columnFamily column family of the column used for sorting + * @param columnQualifier column qualifier of the column used for sorting + * @param colInterpreter SortingColumnInterpreter that is used to compare the cell value + * @param startIndex startIndex(inclusive) + * @param pageSize number of rows to return in this query + * @param singleRegion does this scan request spans multiple regions? + * @return Result[], rows are sorted on the basis of a column in Increasing order + * @throws IOException + */ + Result[] sortIncreasing(Scan scan, byte[] columnFamily, byte[] columnQualifier, + SortingColumnInterpreter colInterpreter, int startIndex, int pageSize, + boolean singleRegion) throws IOException; + + /** + * This method is used to sort rows in Decreasing order. Insertion sort is used to sort the rows. + * + * @param scan Scan object + * @param columnFamily column family of the column used for sorting + * @param columnQualifier column qualifier of the column used for sorting + * @param colInterpreter SortingColumnInterpreter that is used to compare the cell value + * @param startIndex startIndex(inclusive) + * @param pageSize number of rows to return in this query + * @param singleRegion does this scan request spans multiple regions? + * @return Result[], rows are sorted on the basis of a column in Decreasing order + * @throws IOException + */ + Result[] sortDecreasing(Scan scan, byte[] columnFamily, byte[] columnQualifier, + SortingColumnInterpreter colInterpreter, int startIndex, int pageSize, + boolean singleRegion) throws IOException; + +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/SortingColumnInterpreter.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/SortingColumnInterpreter.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/SortingColumnInterpreter.java (revision 0) @@ -0,0 +1,55 @@ +package org.apache.hadoop.hbase.coprocessor; + +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +import java.io.IOException; +import java.util.Comparator; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.io.Writable; + + +/** + * + * This interface defines how a cell value will be read, written, and compared while using the {@link SortingProtocol} + * @param DataType of the column used for sorting + */ +public interface SortingColumnInterpreter extends Writable, Comparator { + + + /** + * This method is used to get the cell value as T + * @param kv KeyValue that needs to be read + * @return value of cell as T + * @throws IOException + */ + T getValue(KeyValue kv) throws IOException; + + /** + * @return the maximum value for T + */ + T getMaxValue(); + + /** + * @return the Minimum value for T + */ + T getMinValue(); + +} Index: src/main/java/org/apache/hadoop/hbase/coprocessor/SortingProtocolImplementation.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/coprocessor/SortingProtocolImplementation.java (revision 0) +++ src/main/java/org/apache/hadoop/hbase/coprocessor/SortingProtocolImplementation.java (revision 0) @@ -0,0 +1,215 @@ +package org.apache.hadoop.hbase.coprocessor; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This concrete class is the implementation of {@link SortingProtocol}. + */ +public class SortingProtocolImplementation extends BaseEndpointCoprocessor implements + SortingProtocol { + + protected static Log log = LogFactory.getLog(SortingProtocolImplementation.class); + + @Override + public Result[] sortIncreasing(Scan scan, byte[] columnFamily, byte[] columnQualifier, + SortingColumnInterpreter colInterpreter, int startIndex, int pageSize, boolean singleRegion) + throws IOException { + /** + * minValues, the Object[][] will store the List in Object[i][0] and value of column + * used for sorting in Object[i][1] + */ + Object[][] minValues = new Object[startIndex + pageSize][2]; + InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion() + .getScanner(scan); + List keyValues; + int noOfElementsInMinValues = 0; + try { + boolean hasMoreRows = false; + // loop until we run out of rows from scanner + do { + keyValues = new ArrayList(); + hasMoreRows = scanner.next(keyValues); + // loop through every keyValue within a Row to find the column used for sorting + for (KeyValue kv : keyValues) { + if ((byte) columnFamily.length == kv.getFamilyLength() + && columnQualifier.length == kv.getQualifierLength() + && Bytes.equals(columnFamily, 0, columnFamily.length, kv.getBuffer(), + kv.getFamilyOffset(), kv.getFamilyLength()) + && Bytes.equals(columnQualifier, 0, columnQualifier.length, kv.getBuffer(), + kv.getQualifierOffset(), kv.getQualifierLength())) { + T val = colInterpreter.getValue(kv); + List tmpKeyValues, tmpKeyValues1 = null; + T tmpValue, tmpValue1; + for (int i = 0; i < minValues.length; i++) { + if (i >= noOfElementsInMinValues) { + // array is not full yet. add a element at the tail of array + minValues[i][0] = keyValues; + minValues[i][1] = val; + noOfElementsInMinValues++; + break; + } + // smallest value will be at index 0 + if (colInterpreter.compare(val, ((T) minValues[i][1])) < 0) { + // current cell value is lesser than minValues[i][1], so shift all minValues[j] + // (where j>=i) element by one index to right. + tmpKeyValues = ((List) (minValues[i][0])); + tmpValue = ((T) minValues[i][1]); + minValues[i][0] = keyValues; + minValues[i][1] = val; + for (int j = i + 1; j < minValues.length; j++) { + if (j >= noOfElementsInMinValues) { + minValues[j][0] = tmpKeyValues; + minValues[j][1] = tmpValue; + noOfElementsInMinValues++; + break; + } + tmpKeyValues1 = ((List) (minValues[j][0])); + tmpValue1 = ((T) minValues[j][1]); + minValues[j][0] = tmpKeyValues; + minValues[j][1] = tmpValue; + tmpKeyValues = tmpKeyValues1; + tmpValue = tmpValue1; + } + break; + } + } + break; + } + } + } while (hasMoreRows); + } finally { + scanner.close(); + } + return copySortedValueRowsIntoResultArray(startIndex, pageSize, singleRegion, minValues, + noOfElementsInMinValues); + } + + @Override + public Result[] sortDecreasing(Scan scan, byte[] columnFamily, byte[] columnQualifier, + SortingColumnInterpreter colInterpreter, int startIndex, int pageSize, boolean singleRegion) + throws IOException { + /** + * minValues, the Object[][] will store the List in Object[i][0] and value of column + * used for sorting in Object[i][1] + */ + Object[][] maxValues = new Object[startIndex + pageSize][2]; + InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion() + .getScanner(scan); + List keyValues; + int noOfElementsInMaxValues = 0; + try { + boolean hasMoreRows = false; + // loop until we run out of rows from scanner + do { + keyValues = new ArrayList(); + hasMoreRows = scanner.next(keyValues); + // loop through every keyValue within a Row to find the column used for sorting + for (KeyValue kv : keyValues) { + if ((byte) columnFamily.length == kv.getFamilyLength() + && columnQualifier.length == kv.getQualifierLength() + && Bytes.equals(columnFamily, 0, columnFamily.length, kv.getBuffer(), + kv.getFamilyOffset(), kv.getFamilyLength()) + && Bytes.equals(columnQualifier, 0, columnQualifier.length, kv.getBuffer(), + kv.getQualifierOffset(), kv.getQualifierLength())) { + T val = colInterpreter.getValue(kv); + List tmpKeyValues, tmpKeyValues1 = null; + T tmpValue, tmpValue1; + for (int i = 0; i < maxValues.length; i++) { + if (i >= noOfElementsInMaxValues) { + // array is not full yet. add a element at the tail of array + maxValues[i][0] = keyValues; + maxValues[i][1] = val; + noOfElementsInMaxValues++; + break; + } + // greatest value will be at index 0 + if (colInterpreter.compare(val, ((T) maxValues[i][1])) > 0) { + // current cell value is greater than maxValues[i][1], so shift all maxValues[j] + // (where j>=i) element by one index to right. + tmpKeyValues = ((List) (maxValues[i][0])); + tmpValue = ((T) maxValues[i][1]); + maxValues[i][0] = keyValues; + maxValues[i][1] = val; + for (int j = i + 1; j < maxValues.length; j++) { + if (j >= noOfElementsInMaxValues) { + maxValues[j][0] = tmpKeyValues; + maxValues[j][1] = tmpValue; + noOfElementsInMaxValues++; + break; + } + tmpKeyValues1 = ((List) (maxValues[j][0])); + tmpValue1 = ((T) maxValues[j][1]); + maxValues[j][0] = tmpKeyValues; + maxValues[j][1] = tmpValue; + tmpKeyValues = tmpKeyValues1; + tmpValue = tmpValue1; + } + break; + } + } + break; + } + } + } while (hasMoreRows); + } finally { + scanner.close(); + } + return copySortedValueRowsIntoResultArray(startIndex, pageSize, singleRegion, maxValues, + noOfElementsInMaxValues); + } + + /** + * This method creates the Result[] from the Object[][] that contains the sorted rows. + * @param startIndex + * @param pageSize + * @param singleRegion + * @param sortedValues + * @param noOfElementsInSortedValues + * @return + */ + private Result[] copySortedValueRowsIntoResultArray(int startIndex, int pageSize, + boolean singleRegion, Object[][] sortedValues, int noOfElementsInSortedValues) { + Result[] finalResults; + long startTime = System.currentTimeMillis(); + if (singleRegion) { + if (noOfElementsInSortedValues < startIndex) { + return new Result[0]; + } + finalResults = new Result[noOfElementsInSortedValues - (startIndex)]; + for (int i = startIndex; i < startIndex + pageSize && i < noOfElementsInSortedValues; i++) { + finalResults[i - startIndex] = new Result((List) sortedValues[i][0]); + } + } else { + finalResults = new Result[noOfElementsInSortedValues]; + for (int i = 0; i < startIndex + pageSize && i < noOfElementsInSortedValues; i++) { + finalResults[i] = new Result((List) sortedValues[i][0]); + } + } + log.debug("Time taken for building result array:" + (System.currentTimeMillis() - startTime)); + return finalResults; + } + +} Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestSortingProtocol.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestSortingProtocol.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestSortingProtocol.java (revision 0) @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; +import java.math.BigDecimal; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.SortingClient; +import org.apache.hadoop.hbase.client.coprocessor.BigDecimalSortingColumnInterpreter; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * A test class to test SortingProtocol. + */ +@Category(MediumTests.class) +public class TestSortingProtocol { + private static Log log = LogFactory.getLog(TestSortingProtocol.class); + + /** + * Creating the test infrastructure. + */ + private static final byte[] TEST_TABLE = Bytes.toBytes("TestTable"); + private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily"); + private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier"); + private static final byte[] TEST_MULTI_CQ = Bytes.toBytes("TestMultiCQ"); + private static final String TEST_TABLE_NAME = "TestTable"; + + private static byte[] ROW = Bytes.toBytes("testRow"); + private static final int ROWSIZE = 100; + private static final int regionSeperator1 = 25; + private static final int regionSeperator2 = 60; + private static byte[][] ROWS = makeN(ROW, ROWSIZE); + + private static HBaseTestingUtility util = new HBaseTestingUtility(); + private static Configuration conf = util.getConfiguration(); + + /** + * A set up method to start the test cluster. AggregateProtocolImpl is registered and will be + * loaded during region startup. Table will have 3 regions and 100 rows. + * @throws Exception + */ + @BeforeClass + public static void setupBeforeClass() throws Exception { + + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + "org.apache.hadoop.hbase.coprocessor.SortingProtocolImplementation"); + + util.startMiniCluster(3); + HTable table = util.createTable(TEST_TABLE, TEST_FAMILY); + util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY, new byte[][] { + HConstants.EMPTY_BYTE_ARRAY, ROWS[regionSeperator1], ROWS[regionSeperator2] }); + /** + * The testtable has one CQ which is always populated and one variable CQ for each row rowkey1: + * CF:CQ CF:CQ1 rowKey2: CF:CQ CF:CQ2 + */ + for (int i = 0; i < ROWSIZE; i++) { + Put put = new Put(ROWS[i]); + put.setWriteToWAL(false); + BigDecimal bd = new BigDecimal(i); + // every alternate row will store -i for example TestRow1:cf: + put.add(TEST_FAMILY, TEST_QUALIFIER, + Bytes.toBytes(bd.multiply(new BigDecimal(Math.pow(-1, i % 2))))); + table.put(put); + Put p2 = new Put(ROWS[i]); + put.setWriteToWAL(false); + p2.add(TEST_FAMILY, Bytes.add(TEST_MULTI_CQ, Bytes.toBytes(bd)), + Bytes.toBytes(bd.multiply(new BigDecimal("0.10")))); + table.put(p2); + } + table.close(); + } + + /** + * Shutting down the cluster + * @throws Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + /** + * an infrastructure method to prepare rows for the testtable. + * @param base + * @param n + * @return + */ + private static byte[][] makeN(byte[] base, int n) { + byte[][] ret = new byte[n][]; + for (int i = 0; i < n; i++) { + ret[i] = Bytes.add(base, Bytes.toBytes(i)); + } + return ret; + } + + /** + * an infrastructure method to prepare rows for the testtable. + * @param base + * @param n + * @return + */ + private static byte[] getN(byte[] base, int n) { + return Bytes.add(base, Bytes.toBytes(n)); + } + + /** + * ****************** Test cases for Basic Sorting ********************** + */ + + @Test + public void testIncreasing() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + HTable table = new HTable(conf, TEST_TABLE_NAME); + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortIncreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 0, 1); + System.out.println("Result[0]" + + results[0].getColumnLatest(TEST_FAMILY, TEST_QUALIFIER).getValue()); + if (results[0].getBytes().compareTo(table.get(new Get(getN(ROW, 99))).getBytes()) == 0) System.out + .println("true"); + assertEquals(table.get(new Get(getN(ROW, 99))).getBytes(), results[0].getBytes()); + } + + @Test + public void testDecreasing() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + HTable table = new HTable(conf, TEST_TABLE_NAME); + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortDecreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 0, 1); + System.out.println("Result[0]" + + results[0].getColumnLatest(TEST_FAMILY, TEST_QUALIFIER).getValue()); + if (results[0].getBytes().compareTo(table.get(new Get(getN(ROW, 98))).getBytes()) == 0) System.out + .println("true"); + assertEquals(table.get(new Get(getN(ROW, 98))).getBytes(), results[0].getBytes()); + } + + @Test + public void testIncreasingPageSizeOf3() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + HTable table = new HTable(conf, TEST_TABLE_NAME); + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortIncreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 1, 3); + System.out.println("Result[0]" + + results[0].getColumnLatest(TEST_FAMILY, TEST_QUALIFIER).getValue()); + if (results[0].getBytes().compareTo(table.get(new Get(getN(ROW, 99))).getBytes()) == 0) System.out + .println("true"); + assertEquals(table.get(new Get(getN(ROW, 97))).getBytes(), results[0].getBytes()); + assertEquals(table.get(new Get(getN(ROW, 95))).getBytes(), results[1].getBytes()); + assertEquals(table.get(new Get(getN(ROW, 93))).getBytes(), results[2].getBytes()); + } + + @Test + public void testDecreasingPageSizeOf3() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + HTable table = new HTable(conf, TEST_TABLE_NAME); + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortDecreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 1, 3); + assertEquals(table.get(new Get(getN(ROW, 96))).getBytes(), results[0].getBytes()); + assertEquals(table.get(new Get(getN(ROW, 94))).getBytes(), results[1].getBytes()); + assertEquals(table.get(new Get(getN(ROW, 92))).getBytes(), results[2].getBytes()); + } + + /** + * This test will run on a single region and have a scan range. + * @throws Throwable + */ + @Test + public void testIncreasingWithScanRange() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + scan.setStartRow(getN(ROW, 30)); + scan.setStopRow(getN(ROW, 40)); + HTable table = new HTable(conf, TEST_TABLE_NAME); + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortIncreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 1, 3); + assertEquals(table.get(new Get(getN(ROW, 37))).getBytes(), results[0].getBytes()); + assertEquals(table.get(new Get(getN(ROW, 35))).getBytes(), results[1].getBytes()); + assertEquals(table.get(new Get(getN(ROW, 33))).getBytes(), results[2].getBytes()); + } + + /** + * This test will run on a single region and have a scan range. + * @throws Throwable + */ + @Test + public void testDecreasingWithScanRange() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + scan.setStartRow(getN(ROW, 30)); + scan.setStopRow(getN(ROW, 40)); + HTable table = new HTable(conf, TEST_TABLE_NAME); + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortDecreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 1, 3); + assertEquals(table.get(new Get(getN(ROW, 36))).getBytes(), results[0].getBytes()); + assertEquals(table.get(new Get(getN(ROW, 34))).getBytes(), results[1].getBytes()); + assertEquals(table.get(new Get(getN(ROW, 32))).getBytes(), results[2].getBytes()); + } + + @Test + public void testIncreasingNoRowsInScanRange() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + scan.setStartRow(getN(ROW, 130)); + scan.setStopRow(getN(ROW, 140)); + HTable table = new HTable(conf, TEST_TABLE_NAME); + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortIncreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 1, 3); + assertEquals(0, results.length); + } + + @Test + public void testIncreasingPrefixFilter() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + scan.setFilter(new PrefixFilter(Bytes.toBytes("random"))); + HTable table = new HTable(conf, TEST_TABLE_NAME); + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortIncreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 1, 3); + assertEquals(0, results.length); + } + + @Test + public void testDecreasingPrefixFilter() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + scan.setFilter(new PrefixFilter(Bytes.toBytes("random"))); + HTable table = new HTable(conf, TEST_TABLE_NAME); + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortDecreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 1, 3); + assertEquals(0, results.length); + } + + /** + * This test will run on a single region and have a scan range. + * @throws Throwable + */ + @Test + public void testDecreasingNoRowsInRange() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + scan.setStartRow(getN(ROW, 130)); + scan.setStopRow(getN(ROW, 140)); + HTable table = new HTable(conf, TEST_TABLE_NAME); + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortDecreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 1, 3); + assertEquals(0, results.length); + } + + @Test + public void testIncreasingWithPageSizeGreaterThanNoOfResults() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + scan.setStartRow(getN(ROW, 30)); + scan.setStopRow(getN(ROW, 40)); + HTable table = new HTable(conf, TEST_TABLE_NAME); + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortIncreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 8, 3); + System.out.println("Result[0]" + + results[0].getColumnLatest(TEST_FAMILY, TEST_QUALIFIER).getValue()); + assertEquals(table.get(new Get(getN(ROW, 36))).getBytes(), results[0].getBytes()); + assertEquals(table.get(new Get(getN(ROW, 38))).getBytes(), results[1].getBytes()); + assertEquals(2, results.length); + } + + @Test + public void testDecreasingWithPageSizeGreaterThanNoOfResults() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + scan.setStartRow(getN(ROW, 30)); + scan.setStopRow(getN(ROW, 40)); + HTable table = new HTable(conf, TEST_TABLE_NAME); + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortDecreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 9, 3); + assertEquals(table.get(new Get(getN(ROW, 39))).getBytes(), results[0].getBytes()); + assertEquals(1, results.length); + } + + @Test + public void testIncreasingWithPageSizeGreaterThanNoOfResults1() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + scan.setStartRow(getN(ROW, 30)); + scan.setStopRow(getN(ROW, 40)); + HTable table = new HTable(conf, TEST_TABLE_NAME); + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortIncreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 10, 3); + assertEquals(0, results.length); + } + + @Test + public void testDecreasingWithPageSizeGreaterThanNoOfResults1() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + scan.setStartRow(getN(ROW, 30)); + scan.setStopRow(getN(ROW, 40)); + HTable table = new HTable(conf, TEST_TABLE_NAME); + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortDecreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 10, 3); + assertEquals(0, results.length); + } + + /** + * ****************** Error Test cases ********************** + */ + @Test + public void testWithScanAsNull() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = null; + HTable table = new HTable(conf, TEST_TABLE_NAME); + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortIncreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 1, 3); + assertEquals(null, (Object) results); + } + + @Test + public void testWithHTableAsNull() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + HTable table = null; + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortIncreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 1, 3); + assertEquals(null, (Object) results); + } + + @Test + public void testWithColumnFamilyAsNull() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + HTable table = null; + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortIncreasing(table, scan, null, TEST_QUALIFIER, ci, 1, 3); + assertEquals(null, (Object) results); + } + + @Test + public void testWithColumnQualifierAsNull() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + HTable table = null; + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortIncreasing(table, scan, TEST_FAMILY, null, ci, 1, 3); + assertEquals(null, (Object) results); + } + + @Test + public void testWithZeroPageSize() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + HTable table = null; + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortIncreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + 1, 0); + assertEquals(null, (Object) results); + } + + @Test + public void testWithNegativeStartIndex() throws Throwable { + SortingClient sortingClient = new SortingClient(); + Scan scan = new Scan(); + HTable table = null; + final SortingColumnInterpreter ci = new BigDecimalSortingColumnInterpreter(); + Result[] results = sortingClient.sortIncreasing(table, scan, TEST_FAMILY, TEST_QUALIFIER, ci, + -1, 3); + assertEquals(null, (Object) results); + } +}