diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index 615c854..59cad11 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Random; @@ -34,6 +33,8 @@ import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.KeySerializer; +import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -42,6 +43,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; +import org.apache.hadoop.io.BinaryComparable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Text; @@ -73,7 +75,7 @@ // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is // ready - protected transient Serializer keySerializer; + protected transient KeySerializer keySerializer; protected transient boolean keyIsText; protected transient Serializer valueSerializer; transient int tag; @@ -126,7 +128,7 @@ protected void initializeOp(Configuration hconf) throws HiveException { LOG.info("Using tag = " + tag); TableDesc keyTableDesc = conf.getKeySerializeInfo(); - keySerializer = (Serializer) keyTableDesc.getDeserializerClass() + keySerializer = (KeySerializer) keyTableDesc.getDeserializerClass() .newInstance(); keySerializer.initialize(null, keyTableDesc.getProperties()); keyIsText = keySerializer.getSerializedClass().equals(Text.class); @@ -167,7 +169,10 @@ private TopNHash createTopKHash() { protected transient HiveKey keyWritable = new HiveKey(); protected transient Writable value; - transient StructObjectInspector keyObjectInspector; + transient ObjectInspector[] distKeyObjectInspector; + transient UnionObjectInspector unionKeyObjectInspector; + transient boolean[] keyDescending; + transient StructObjectInspector valueObjectInspector; transient ObjectInspector[] partitionObjectInspectors; @@ -200,37 +205,23 @@ private TopNHash createTopKHash() { * If distinctColIndices is empty, the object inspector is same as * {@link Operator#initEvaluatorsAndReturnStruct(ExprNodeEvaluator[], List, ObjectInspector)} */ - protected static StructObjectInspector initEvaluatorsAndReturnStruct( - ExprNodeEvaluator[] evals, List> distinctColIndices, - List outputColNames, - int length, ObjectInspector rowInspector) + protected static UnionObjectInspector initDistictObjectInspector(ExprNodeEvaluator[] evals, + List> distinctColIndices, ObjectInspector rowInspector) throws HiveException { - int inspectorLen = evals.length > length ? length + 1 : evals.length; - List sois = new ArrayList(inspectorLen); - - // keys - ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, 0, length, rowInspector); - sois.addAll(Arrays.asList(fieldObjectInspectors)); - - if (outputColNames.size() > length) { - // union keys - List uois = new ArrayList(); - for (List distinctCols : distinctColIndices) { - List names = new ArrayList(); - List eois = new ArrayList(); - int numExprs = 0; - for (int i : distinctCols) { - names.add(HiveConf.getColumnInternalName(numExprs)); - eois.add(evals[i].initialize(rowInspector)); - numExprs++; - } - uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois)); + // union keys + List uois = new ArrayList(); + for (List distinctCols : distinctColIndices) { + List names = new ArrayList(); + List eois = new ArrayList(); + int numExprs = 0; + for (int i : distinctCols) { + names.add(HiveConf.getColumnInternalName(numExprs)); + eois.add(evals[i].initialize(rowInspector)); + numExprs++; } - UnionObjectInspector uoi = - ObjectInspectorFactory.getStandardUnionObjectInspector(uois); - sois.add(uoi); + uois.add(ObjectInspectorFactory.getStandardStructObjectInspector(names, eois)); } - return ObjectInspectorFactory.getStandardStructObjectInspector(outputColNames, sois ); + return ObjectInspectorFactory.getStandardUnionObjectInspector(uois); } @Override @@ -240,9 +231,12 @@ public void processOp(Object row, int tag) throws HiveException { ObjectInspector rowInspector = inputObjInspectors[tag]; if (firstRow) { firstRow = false; - keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, - distinctColIndices, - conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector); + distKeyObjectInspector = initEvaluators(keyEval, 0, numDistributionKeys, rowInspector); + if (keyEval.length > numDistributionKeys) { + unionKeyObjectInspector = initDistictObjectInspector(keyEval, + distinctColIndices, rowInspector); + } + keyDescending = toOrder(conf.getOrder()); valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf .getOutputValueColumnNames(), rowInspector); partitionObjectInspectors = initEvaluators(partitionEval, rowInspector); @@ -282,7 +276,10 @@ public void processOp(Object row, int tag) throws HiveException { // Evaluate the keys for (int i = 0; i < numDistributionKeys; i++) { cachedKeys[0][i] = keyEval[i].evaluate(row); + keySerializer.append(cachedKeys[0][i], distKeyObjectInspector[i], keyDescending[i]); } + keyWritable.setDistKeyLength(keySerializer.mark()); + if (numDistinctExprs > 0) { // with distinct key(s) for (int i = 0; i < numDistinctExprs; i++) { @@ -306,32 +303,11 @@ public void processOp(Object row, int tag) throws HiveException { BytesWritable value = null; // Serialize the keys and append the tag for (int i = 0; i < cachedKeys.length; i++) { - if (keyIsText) { - Text key = (Text) keySerializer.serialize(cachedKeys[i], - keyObjectInspector); - if (tag == -1) { - keyWritable.set(key.getBytes(), 0, key.getLength()); - } else { - int keyLength = key.getLength(); - keyWritable.setSize(keyLength + 1); - System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); - keyWritable.get()[keyLength] = tagByte[0]; - } - } else { - // Must be BytesWritable - BytesWritable key = (BytesWritable) keySerializer.serialize( - cachedKeys[i], keyObjectInspector); - if (tag == -1) { - keyWritable.set(key.getBytes(), 0, key.getLength()); - } else { - int keyLength = key.getLength(); - keyWritable.setSize(keyLength + 1); - System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); - keyWritable.get()[keyLength] = tagByte[0]; - } + if (numDistinctExprs > 0) { + keySerializer.append(cachedKeys[i][numDistributionKeys], unionKeyObjectInspector, false); } + HiveKey keyWritable = toHiveKey(keySerializer.reset(), tag); keyWritable.setHashCode(keyHashCode); - if (reducerHash == null) { if (null != out) { collect(keyWritable, value = getValue(row, value)); @@ -362,6 +338,7 @@ public void processOp(Object row, int tag) throws HiveException { } } } + keySerializer.clear(); } catch (HiveException e) { throw e; } catch (Exception e) { @@ -369,6 +346,26 @@ public void processOp(Object row, int tag) throws HiveException { } } + private boolean[] toOrder(String order) { + boolean[] descending = new boolean[order.length()]; + for (int i = 0; i < descending.length; i++) { + descending[i] = order.charAt(i) == '-'; + } + return descending; + } + + private HiveKey toHiveKey(BinaryComparable key, int tag) throws SerDeException { + if (tag == -1) { + keyWritable.set(key.getBytes(), 0, key.getLength()); + } else { + int keyLength = key.getLength(); + keyWritable.setSize(keyLength + 1); + System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); + keyWritable.get()[keyLength] = tagByte[0]; + } + return keyWritable; + } + public void collect(BytesWritable key, BytesWritable value) throws IOException { // Since this is a terminal operator, update counters explicitly - // forward is not called diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java index 3111367..20b62ca 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java @@ -58,6 +58,7 @@ protected final byte[][] keys; protected final byte[][] values; protected final int[] hashes; + protected final int[] keyLengths; protected int evicted; // recetly evicted index (the biggest one. used for next key/value) protected int excluded; // count of excluded rows from previous flush @@ -66,7 +67,9 @@ public int compare(Integer o1, Integer o2) { byte[] key1 = keys[o1]; byte[] key2 = keys[o2]; - return WritableComparator.compareBytes(key1, 0, key1.length, key2, 0, key2.length); + int length1 = keyLengths[o1]; + int length2 = keyLengths[o2]; + return WritableComparator.compareBytes(key1, 0, length1, key2, 0, length2); } }; @@ -92,6 +95,7 @@ public static TopNHash create(boolean grouped, int topN, long threshold, this.keys = new byte[topN + 1][]; this.values = new byte[topN + 1][]; this.hashes = new int[topN + 1]; + this.keyLengths = new int[topN + 1]; this.evicted = topN; } @@ -112,6 +116,7 @@ public int indexOf(HiveKey key) { int index = size < topN ? size : evicted; keys[index] = Arrays.copyOf(key.getBytes(), key.getLength()); hashes[index] = key.hashCode(); + keyLengths[index] = key.getDistKeyLength(); if (!store(index)) { // it's only for GBY which should forward all values associated with the key in the range // of limit. new value should be attatched with the key but in current implementation, @@ -148,6 +153,7 @@ public void removed(int index) { values[index] = null; } hashes[index] = -1; + keyLengths[index] = -1; } public void set(int index, BytesWritable value) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java index 99c0209..f9cf2bd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java @@ -29,7 +29,10 @@ private static final int LENGTH_BYTES = 4; - boolean hashCodeValid; + private int hashCode; + private boolean hashCodeValid; + + private transient int distKeyLength; public HiveKey() { hashCodeValid = false; @@ -37,15 +40,13 @@ public HiveKey() { public HiveKey(byte[] bytes, int hashcode) { super(bytes); - myHashCode = hashcode; + hashCode = hashcode; hashCodeValid = true; } - protected int myHashCode; - public void setHashCode(int myHashCode) { hashCodeValid = true; - this.myHashCode = myHashCode; + hashCode = myHashCode; } @Override @@ -54,7 +55,15 @@ public int hashCode() { throw new RuntimeException("Cannot get hashCode() from deserialized " + HiveKey.class); } - return myHashCode; + return hashCode; + } + + public void setDistKeyLength(int distKeyLength) { + this.distKeyLength = distKeyLength; + } + + public int getDistKeyLength() { + return distKeyLength; } /** A Comparator optimized for HiveKey. */ diff --git ql/src/test/queries/clientpositive/limit_pushdown.q ql/src/test/queries/clientpositive/limit_pushdown.q index e4d0aa0..26d4c61 100644 --- ql/src/test/queries/clientpositive/limit_pushdown.q +++ ql/src/test/queries/clientpositive/limit_pushdown.q @@ -22,12 +22,12 @@ select value,avg(key + 1) from src group by value order by value limit 20; -- distincts explain -select distinct(key) from src limit 20; -select distinct(key) from src limit 20; +select distinct(cdouble) from alltypesorc limit 20; +select distinct(cdouble) from alltypesorc limit 20; explain -select key, count(distinct(key)) from src group by key limit 20; -select key, count(distinct(key)) from src group by key limit 20; +select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20; +select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20; -- limit zero explain diff --git ql/src/test/results/clientpositive/limit_pushdown.q.out ql/src/test/results/clientpositive/limit_pushdown.q.out index 6aa5feb..ea1fa86 100644 --- ql/src/test/results/clientpositive/limit_pushdown.q.out +++ ql/src/test/results/clientpositive/limit_pushdown.q.out @@ -392,14 +392,14 @@ val_129 130.0 val_131 132.0 PREHOOK: query: -- distincts explain -select distinct(key) from src limit 20 +select distinct(cdouble) from alltypesorc limit 20 PREHOOK: type: QUERY POSTHOOK: query: -- distincts explain -select distinct(key) from src limit 20 +select distinct(cdouble) from alltypesorc limit 20 POSTHOOK: type: QUERY ABSTRACT SYNTAX TREE: - (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECTDI (TOK_SELEXPR (TOK_TABLE_OR_COL key))) (TOK_LIMIT 20))) + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECTDI (TOK_SELEXPR (TOK_TABLE_OR_COL cdouble))) (TOK_LIMIT 20))) STAGE DEPENDENCIES: Stage-1 is a root stage @@ -409,29 +409,29 @@ STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: - src + alltypesorc TableScan - alias: src + alias: alltypesorc Select Operator expressions: - expr: key - type: string - outputColumnNames: key + expr: cdouble + type: double + outputColumnNames: cdouble Group By Operator bucketGroup: false keys: - expr: key - type: string + expr: cdouble + type: double mode: hash outputColumnNames: _col0 Reduce Output Operator key expressions: expr: _col0 - type: string + type: double sort order: + Map-reduce partition columns: expr: _col0 - type: string + type: double tag: -1 TopN: 20 TopN Hash Memory Usage: 0.3 @@ -440,13 +440,13 @@ STAGE PLANS: bucketGroup: false keys: expr: KEY._col0 - type: string + type: double mode: mergepartial outputColumnNames: _col0 Select Operator expressions: expr: _col0 - type: string + type: double outputColumnNames: _col0 Limit File Output Operator @@ -462,42 +462,42 @@ STAGE PLANS: limit: 20 -PREHOOK: query: select distinct(key) from src limit 20 +PREHOOK: query: select distinct(cdouble) from alltypesorc limit 20 PREHOOK: type: QUERY -PREHOOK: Input: default@src +PREHOOK: Input: default@alltypesorc #### A masked pattern was here #### -POSTHOOK: query: select distinct(key) from src limit 20 +POSTHOOK: query: select distinct(cdouble) from alltypesorc limit 20 POSTHOOK: type: QUERY -POSTHOOK: Input: default@src +POSTHOOK: Input: default@alltypesorc #### A masked pattern was here #### -0 -10 -100 -103 -104 -105 -11 -111 -113 -114 -116 -118 -119 -12 -120 -125 -126 -128 -129 -131 +NULL +-16379.0 +-16373.0 +-16372.0 +-16369.0 +-16355.0 +-16339.0 +-16324.0 +-16311.0 +-16310.0 +-16309.0 +-16307.0 +-16306.0 +-16305.0 +-16300.0 +-16296.0 +-16280.0 +-16277.0 +-16274.0 +-16269.0 PREHOOK: query: explain -select key, count(distinct(key)) from src group by key limit 20 +select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20 PREHOOK: type: QUERY POSTHOOK: query: explain -select key, count(distinct(key)) from src group by key limit 20 +select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20 POSTHOOK: type: QUERY ABSTRACT SYNTAX TREE: - (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL key)))) (TOK_GROUPBY (TOK_TABLE_OR_COL key)) (TOK_LIMIT 20))) + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME alltypesorc))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL ctinyint)) (TOK_SELEXPR (TOK_FUNCTIONDI count (TOK_TABLE_OR_COL cdouble)))) (TOK_GROUPBY (TOK_TABLE_OR_COL ctinyint)) (TOK_LIMIT 20))) STAGE DEPENDENCIES: Stage-1 is a root stage @@ -507,36 +507,42 @@ STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: - src + alltypesorc TableScan - alias: src + alias: alltypesorc Select Operator expressions: - expr: key - type: string - outputColumnNames: key + expr: ctinyint + type: tinyint + expr: cdouble + type: double + outputColumnNames: ctinyint, cdouble Group By Operator aggregations: - expr: count(DISTINCT key) + expr: count(DISTINCT cdouble) bucketGroup: false keys: - expr: key - type: string + expr: ctinyint + type: tinyint + expr: cdouble + type: double mode: hash - outputColumnNames: _col0, _col1 + outputColumnNames: _col0, _col1, _col2 Reduce Output Operator key expressions: expr: _col0 - type: string + type: tinyint + expr: _col1 + type: double sort order: ++ Map-reduce partition columns: expr: _col0 - type: string + type: tinyint tag: -1 TopN: 20 TopN Hash Memory Usage: 0.3 value expressions: - expr: _col1 + expr: _col2 type: bigint Reduce Operator Tree: Group By Operator @@ -545,13 +551,13 @@ STAGE PLANS: bucketGroup: false keys: expr: KEY._col0 - type: string + type: tinyint mode: mergepartial outputColumnNames: _col0, _col1 Select Operator expressions: expr: _col0 - type: string + type: tinyint expr: _col1 type: bigint outputColumnNames: _col0, _col1 @@ -569,34 +575,34 @@ STAGE PLANS: limit: 20 -PREHOOK: query: select key, count(distinct(key)) from src group by key limit 20 +PREHOOK: query: select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20 PREHOOK: type: QUERY -PREHOOK: Input: default@src +PREHOOK: Input: default@alltypesorc #### A masked pattern was here #### -POSTHOOK: query: select key, count(distinct(key)) from src group by key limit 20 +POSTHOOK: query: select ctinyint, count(distinct(cdouble)) from alltypesorc group by ctinyint limit 20 POSTHOOK: type: QUERY -POSTHOOK: Input: default@src +POSTHOOK: Input: default@alltypesorc #### A masked pattern was here #### -0 1 -10 1 -100 1 -103 1 -104 1 -105 1 -11 1 -111 1 -113 1 -114 1 -116 1 -118 1 -119 1 -12 1 -120 1 -125 1 -126 1 -128 1 -129 1 -131 1 +NULL 2932 +-64 24 +-63 19 +-62 27 +-61 25 +-60 27 +-59 31 +-58 23 +-57 35 +-56 36 +-55 29 +-54 26 +-53 22 +-52 33 +-51 21 +-50 30 +-49 26 +-48 29 +-47 22 +-46 24 PREHOOK: query: -- limit zero explain select key,value from src order by key limit 0 diff --git serde/src/java/org/apache/hadoop/hive/serde2/KeySerializer.java serde/src/java/org/apache/hadoop/hive/serde2/KeySerializer.java new file mode 100644 index 0000000..ba42f51 --- /dev/null +++ serde/src/java/org/apache/hadoop/hive/serde2/KeySerializer.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.serde2; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.BinaryComparable; + +public interface KeySerializer extends Serializer { + + void append(Object obj, ObjectInspector objInspector, boolean descending) throws SerDeException; + + int mark() throws SerDeException; + + BinaryComparable reset() throws SerDeException; + + void clear() throws SerDeException; +} diff --git serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java index df85961..45e0b92 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java +++ serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDe.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.KeySerializer; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.io.ByteWritable; @@ -73,6 +74,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; +import org.apache.hadoop.io.BinaryComparable; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.FloatWritable; @@ -104,7 +106,7 @@ * fields in the same top-level field will have the same sort order. * */ -public class BinarySortableSerDe extends AbstractSerDe { +public class BinarySortableSerDe extends AbstractSerDe implements KeySerializer { public static final Log LOG = LogFactory.getLog(BinarySortableSerDe.class .getName()); @@ -580,7 +582,7 @@ static Text deserializeText(InputByteBuffer buffer, boolean invert, Text r) @Override public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { - outputByteBuffer.reset(); + outputByteBuffer.clear(); StructObjectInspector soi = (StructObjectInspector) objInspector; List fields = soi.getAllStructFieldRefs(); @@ -594,6 +596,29 @@ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDe return serializeBytesWritable; } + @Override + public void append(Object obj, ObjectInspector objInspector, boolean descending) + throws SerDeException { + serialize(outputByteBuffer, obj, objInspector, descending); + } + + @Override + public int mark() throws SerDeException { + return outputByteBuffer.mark(); + } + + @Override + public BinaryComparable reset() throws SerDeException { + serializeBytesWritable.set(outputByteBuffer.getData(), 0, outputByteBuffer.getLength()); + outputByteBuffer.reset(); + return serializeBytesWritable; + } + + @Override + public void clear() throws SerDeException { + outputByteBuffer.clear(); + } + static void serialize(OutputByteBuffer buffer, Object o, ObjectInspector oi, boolean invert) throws SerDeException { // Is this field a null? diff --git serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/OutputByteBuffer.java serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/OutputByteBuffer.java index 7bfe473..0d9a8d9 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/OutputByteBuffer.java +++ serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/OutputByteBuffer.java @@ -27,12 +27,21 @@ byte[] data = new byte[128]; int length; + int mark; /** * Reset the byte buffer. */ - public void reset() { - length = 0; + public int reset() { + return length = mark; + } + + public int mark() { + return mark = length; + } + + public void clear() { + length = mark = 0; } public final void write(byte b) { @@ -82,5 +91,4 @@ public String dumpHex() { } return sb.toString(); } - }