Index: main/java/org/apache/hadoop/hbase/coprocessor/GetListImplementation.java =================================================================== --- main/java/org/apache/hadoop/hbase/coprocessor/GetListImplementation.java (revision 0) +++ main/java/org/apache/hadoop/hbase/coprocessor/GetListImplementation.java (revision 0) @@ -0,0 +1,142 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.SplitKeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A concrete GetListProtocol implementation. Its returns all rows from that region + * matching the scan criteria + */ +public class GetListImplementation extends BaseEndpointCoprocessor implements +GetListProtocol { + + protected static Log log = LogFactory.getLog(GetListImplementation.class); + private NavigableMap>> familyMap = null; + + + private KeyValue[] kvs = null; + + + public List getList(Scan scan) throws IOException { + InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion().getScanner(scan); + + List results = new ArrayList(); + List finalResults = new ArrayList(); + + try { + boolean hasMoreRows = false; + do { + hasMoreRows = scanner.next(results); + + for (KeyValue kv : results) { + finalResults.add(kv); + } + + results.clear(); + + } while (hasMoreRows); + + this.kvs = finalResults.toArray(new KeyValue[results.size()]); + } finally { + scanner.close(); + } + + return finalResults; + } + + public NavigableMap> getNoVersionMap() { + if (this.familyMap == null) { + getMap(); + } + if (isEmpty()) { + return null; + } + NavigableMap> returnMap = new TreeMap>( + Bytes.BYTES_COMPARATOR); + for (Map.Entry>> familyEntry : familyMap + .entrySet()) { + NavigableMap qualifierMap = new TreeMap( + Bytes.BYTES_COMPARATOR); + for (Map.Entry> qualifierEntry : familyEntry + .getValue().entrySet()) { + byte[] value = qualifierEntry.getValue().get( + qualifierEntry.getValue().firstKey()); + qualifierMap.put(qualifierEntry.getKey(), value); + } + returnMap.put(familyEntry.getKey(), qualifierMap); + } + return returnMap; + } + + public NavigableMap>> getMap() { + if (this.familyMap != null) { + return this.familyMap; + } + if (isEmpty()) { + return null; + } + this.familyMap = new TreeMap>>( + Bytes.BYTES_COMPARATOR); + for (KeyValue kv : this.kvs) { + SplitKeyValue splitKV = kv.split(); + byte[] family = splitKV.getFamily(); + NavigableMap> columnMap = familyMap + .get(family); + if (columnMap == null) { + columnMap = new TreeMap>( + Bytes.BYTES_COMPARATOR); + familyMap.put(family, columnMap); + } + byte[] qualifier = splitKV.getQualifier(); + NavigableMap versionMap = columnMap.get(qualifier); + if (versionMap == null) { + versionMap = new TreeMap(new Comparator() { + public int compare(Long l1, Long l2) { + return l2.compareTo(l1); + } + }); + columnMap.put(qualifier, versionMap); + } + Long timestamp = Bytes.toLong(splitKV.getTimestamp()); + byte[] value = splitKV.getValue(); + versionMap.put(timestamp, value); + } + return this.familyMap; + } + + public boolean isEmpty() { + return this.kvs == null || this.kvs.length == 0; + } +} Index: main/java/org/apache/hadoop/hbase/coprocessor/GetListProtocol.java =================================================================== --- main/java/org/apache/hadoop/hbase/coprocessor/GetListProtocol.java (revision 0) +++ main/java/org/apache/hadoop/hbase/coprocessor/GetListProtocol.java (revision 0) @@ -0,0 +1,37 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; + +/** + * Defines the functions that are to be supported in this Coprocessor. + * For each method, it takes a Scan object. + */ +public interface GetListProtocol extends CoprocessorProtocol { + List getList(Scan scan) throws IOException; + +} \ No newline at end of file Index: main/java/org/apache/hadoop/hbase/client/coprocessor/GetListClient.java =================================================================== --- main/java/org/apache/hadoop/hbase/client/coprocessor/GetListClient.java (revision 0) +++ main/java/org/apache/hadoop/hbase/client/coprocessor/GetListClient.java (revision 0) @@ -0,0 +1,187 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client.coprocessor; + +import java.io.IOException; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.GetListProtocol; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * This client class is for invoking the GetList function deployed on the + * Region Server side via the GetListProtocol. This class will implement the + * supporting functionality for returning rows obtained from the + * GetListProtocol for each region.

+ * This will serve as the client side handler for invoking the GetList + * function. + */ +public class GetListClient { + Configuration conf; + TreeMap>>> rowMap = null; + private KeyValue[] kvs = null; + + private static final Log log = LogFactory.getLog(GetListClient.class); + + /** + * Constructor with Conf object + * + * @param cfg + */ + public GetListClient(Configuration cfg) { + this.conf = cfg; + } + + /** + * @param tableName + * @param scan + * @return + * @throws Throwable + */ + public TreeMap>>> getList( + final byte[] tableName, final Scan scan) throws Throwable { + + class RowNumCallback implements Batch.Callback> { + private List finalMap = null; + + public List getFinalMap() { + return finalMap; + } + + @Override + public void update(byte[] region, byte[] row, List resultMap) { + if (finalMap == null) { + finalMap = resultMap; + } else { + finalMap.addAll(resultMap); + } + } + } + + + HTable table = new HTable(conf, tableName); + RowNumCallback rowNumCB = new RowNumCallback(); + + + table.coprocessorExec(GetListProtocol.class, scan.getStartRow(), scan.getStopRow(), + new Batch.Call>() { + @Override + public List call(GetListProtocol instance) throws IOException { + return instance.getList(scan); + } + }, rowNumCB); + + this.kvs = rowNumCB.getFinalMap().toArray(new KeyValue[rowNumCB.getFinalMap().size()]); + + table.close(); + + return getMap(); + } + + + public TreeMap>>> getMap() { + if (this.rowMap != null) { + return this.rowMap; + } + + if (isEmpty()) { + return null; + } + + byte[] rowKey, family, qualifier, value; + Long timestamp; + + this.rowMap = new TreeMap>>>(Bytes.BYTES_COMPARATOR); + + NavigableMap>> familyMap; + NavigableMap> qualifierMap; + NavigableMap timestampMap; + + for (KeyValue kv : this.kvs) { + rowKey = kv.getRow(); + family = kv.getFamily(); + qualifier = kv.getQualifier(); + value = kv.getValue(); + timestamp = kv.getTimestamp(); + + if (rowMap.containsKey(rowKey)) { + familyMap = rowMap.get(rowKey); + + if (familyMap.containsKey(family)) { + qualifierMap = familyMap.get(family); + + if (qualifierMap.containsKey(qualifier)) { + timestampMap = qualifierMap.get(qualifier); + + if (timestampMap.containsKey(timestamp)) { + timestampMap.put(timestamp, value); + } else { + timestampMap.put(timestamp, value); + } + + } else { + timestampMap = new TreeMap(); + + timestampMap.put(timestamp, value); + qualifierMap.put(qualifier, timestampMap); + } + } else { + qualifierMap = new TreeMap>(Bytes.BYTES_COMPARATOR); + timestampMap = new TreeMap(); + + timestampMap.put(timestamp, value); + qualifierMap.put(qualifier, timestampMap); + familyMap.put(family, qualifierMap); + } + } else { + familyMap = new TreeMap>>(Bytes.BYTES_COMPARATOR); + qualifierMap = new TreeMap>(Bytes.BYTES_COMPARATOR); + timestampMap = new TreeMap(); + + timestampMap.put(timestamp, value); + qualifierMap.put(qualifier, timestampMap); + familyMap.put(family, qualifierMap); + + rowMap.put(rowKey, familyMap); + } + } + + + return this.rowMap; + } + + /** + * @return returns boolean indicating whether the getlist map has been initialized or if it is empty + */ + public boolean isEmpty() { + return this.kvs == null || this.kvs.length == 0; + } + +}