diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java index e03f4b7..a345afa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java @@ -496,8 +496,10 @@ protected HiveKey toHiveKey(Object obj, int tag, Integer distLength) throws SerD } @Override - public void collect(byte[] key, byte[] value, int hash) throws IOException { + public void collect(byte[] key, byte[] value, int hash, boolean hasNonCompBytes) + throws IOException { HiveKey keyWritable = new HiveKey(key, hash); + keyWritable.setHasNonCompBytes(hasNonCompBytes); BytesWritable valueWritable = new BytesWritable(value); collect(keyWritable, valueWritable); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java index f3c7c77..a084aac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java @@ -50,7 +50,7 @@ * Currently only used to forward key/values stored in hash. */ public static interface BinaryCollector { - public void collect(byte[] key, byte[] value, int hash) throws IOException; + public void collect(byte[] key, byte[] value, int hash, boolean hasNonCompBytes) throws IOException; } public static final int FORWARD = -1; // Forward the row to reducer as is. @@ -68,6 +68,7 @@ private byte[][] values; private int[] hashes; private int[] distKeyLengths; + private boolean[] hasNonCompBytes; private IndexStore indexes; // The heap over the keys, storing indexes in the array. private int evicted; // recently evicted index (used for next key/value) @@ -87,7 +88,9 @@ public int compare(Integer o1, Integer o2) { byte[] key2 = keys[o2]; int length1 = distKeyLengths[o1]; int length2 = distKeyLengths[o2]; - return WritableComparator.compareBytes(key1, 0, length1, key2, 0, length2); + boolean hasNonComp1 = hasNonCompBytes[o1]; + boolean hasNonComp2 = hasNonCompBytes[o2]; + return HiveKey.compare(key1, 0, length1, hasNonComp1, key2, 0, length2, hasNonComp2); } }; @@ -131,6 +134,7 @@ public void initialize( this.values = new byte[topN + 1][]; this.hashes = new int[topN + 1]; this.distKeyLengths = new int[topN + 1]; + this.hasNonCompBytes = new boolean[topN + 1]; this.evicted = topN; this.isEnabled = true; } @@ -217,6 +221,7 @@ public void tryStoreVectorizedKey(HiveKey key, boolean partColsIsNull, int batch keys[index] = Arrays.copyOf(key.getBytes(), key.getLength()); distKeyLengths[index] = key.getDistKeyLength(); hashes[index] = key.hashCode(); + hasNonCompBytes[index] = key.getHasNonCompBytes(); Integer collisionIndex = indexes.store(index); if (null != collisionIndex) { /* @@ -284,6 +289,7 @@ public HiveKey getVectorizedKeyToForward(int batchIndex) { hk.set(keys[index], 0, keys[index].length); hk.setHashCode(hashes[index]); hk.setDistKeyLength(distKeyLengths[index]); + hk.setHasNonCompBytes(hasNonCompBytes[index]); return hk; } @@ -354,6 +360,7 @@ private int insertKeyIntoHeap(HiveKey key) throws IOException, HiveException { keys[index] = Arrays.copyOf(key.getBytes(), key.getLength()); distKeyLengths[index] = key.getDistKeyLength(); hashes[index] = key.hashCode(); + hasNonCompBytes[index] = key.getHasNonCompBytes(); if (null != indexes.store(index)) { // it's only for GBY which should forward all values associated with the key in the range // of limit. new value should be attatched with the key but in current implementation, @@ -382,15 +389,17 @@ private void removed(int index) { } hashes[index] = -1; distKeyLengths[index] = -1; + hasNonCompBytes[index] = false; } private void flushInternal() throws IOException, HiveException { for (int index : indexes.indexes()) { if (index != evicted && values[index] != null) { - collector.collect(keys[index], values[index], hashes[index]); + collector.collect(keys[index], values[index], hashes[index], hasNonCompBytes[index]); usage -= values[index].length; values[index] = null; hashes[index] = -1; + hasNonCompBytes[index] = false; } } excluded = 0; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java index 1dffff2..8fc6d1a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java @@ -19,14 +19,13 @@ package org.apache.hadoop.hive.ql.exec.mr; import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; import java.net.URLClassLoader; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.MapredContext; @@ -88,7 +87,7 @@ // runtime objects private transient Object keyObject; - private transient BytesWritable groupKey; + private transient HiveKey groupKey; @Override public void configure(JobConf job) { @@ -174,20 +173,20 @@ public void reduce(Object key, Iterator values, OutputCollector output, } try { - BytesWritable keyWritable = (BytesWritable) key; + HiveKey keyWritable = (HiveKey) key; byte tag = 0; if (isTagged) { // remove the tag from key coming out of reducer // and store it in separate variable. - int size = keyWritable.getSize() - 1; - tag = keyWritable.get()[size]; + int size = keyWritable.getLength() - 1; + tag = keyWritable.getBytes()[size]; keyWritable.setSize(size); } if (!keyWritable.equals(groupKey)) { // If a operator wants to do some work at the beginning of a group if (groupKey == null) { // the first group - groupKey = new BytesWritable(); + groupKey = new HiveKey(); } else { // If a operator wants to do some work at the end of a group if (isTraceEnabled) { @@ -206,7 +205,8 @@ public void reduce(Object key, Iterator values, OutputCollector output, + keyTableDesc.getProperties(), e); } - groupKey.set(keyWritable.get(), 0, keyWritable.getSize()); + groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength()); + groupKey.setHasNonCompBytes(keyWritable.getHasNonCompBytes()); if (isTraceEnabled) { LOG.trace("Start Group"); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java index 72faf8b..2085fd0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java @@ -109,6 +109,7 @@ private void writeValue(Output output, BytesWritable bytesWritable) { private HiveKey readHiveKey(Input input) { HiveKey hiveKey = new HiveKey(input.readBytes(input.readInt()), input.readInt()); hiveKey.setDistKeyLength(input.readInt()); + hiveKey.setHasNonCompBytes(input.readBoolean()); return hiveKey; } @@ -118,6 +119,7 @@ private void writeHiveKey(Output output, HiveKey hiveKey) { output.writeBytes(hiveKey.getBytes(), 0, size); output.writeInt(0); // Since hashCode is not used, just put an arbitrary number output.writeInt(hiveKey.getDistKeyLength()); + output.writeBoolean(hiveKey.getHasNonCompBytes()); } public void add(HiveKey key, BytesWritable value) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java index c3e820d..ecdefd8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveKVResultCache.java @@ -130,6 +130,7 @@ private HiveKey readHiveKey(Input input) { HiveKey hiveKey = new HiveKey( input.readBytes(input.readInt()), input.readInt()); hiveKey.setDistKeyLength(input.readInt()); + hiveKey.setHasNonCompBytes(input.readBoolean()); return hiveKey; } @@ -139,6 +140,7 @@ private void writeHiveKey(Output output, HiveKey hiveKey) { output.writeBytes(hiveKey.getBytes(), 0, size); output.writeInt(hiveKey.hashCode()); output.writeInt(hiveKey.getDistKeyLength()); + output.writeBoolean(hiveKey.getHasNonCompBytes()); } public synchronized void add(HiveKey key, BytesWritable value) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 7eaad18..48d68e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.MapredContext; @@ -88,7 +89,7 @@ // runtime objects private transient Object keyObject; - private transient BytesWritable groupKey; + private transient HiveKey groupKey; private DataOutputBuffer buffer; private VectorizedRowBatch[] batches; @@ -259,22 +260,24 @@ public void processRow(Object key, final Object value) throws IOException { } try { - BytesWritable keyWritable = (BytesWritable) key; + HiveKey keyWritable = (HiveKey) key; byte tag = 0; if (isTagged) { // remove the tag from key coming out of reducer // and store it in separate variable. // make a copy for multi-insert with join case as Spark re-uses input key from same parent - int size = keyWritable.getSize() - 1; - tag = keyWritable.get()[size]; - keyWritable = new BytesWritable(keyWritable.getBytes(), size); + keyWritable = new HiveKey(); + keyWritable.set(((HiveKey) key).getBytes(), 0, ((HiveKey) key).getLength()); + keyWritable.setHasNonCompBytes(((HiveKey) key).getHasNonCompBytes()); + int size = keyWritable.getLength() - 1; + tag = keyWritable.getBytes()[size]; keyWritable.setSize(size); } if (!keyWritable.equals(groupKey)) { // If a operator wants to do some work at the beginning of a group if (groupKey == null) { // the first group - groupKey = new BytesWritable(); + groupKey = new HiveKey(); } else { // If a operator wants to do some work at the end of a group LOG.trace("End Group"); @@ -291,7 +294,8 @@ public void processRow(Object key, final Object value) throws IOException { + keyTableDesc.getProperties(), e); } - groupKey.set(keyWritable.get(), 0, keyWritable.getSize()); + groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength()); + groupKey.setHasNonCompBytes(keyWritable.getHasNonCompBytes()); LOG.trace("Start Group"); reducer.setGroupKeyObject(keyObject); reducer.startGroup(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index eb9883a..00e89fb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -17,10 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec.spark; -import java.io.File; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import java.util.Collection; import com.google.common.base.Preconditions; @@ -57,6 +55,7 @@ public static HiveKey copyHiveKey(HiveKey key) { copy.setDistKeyLength(key.getDistKeyLength()); copy.setHashCode(key.hashCode()); copy.set(key); + copy.setHasNonCompBytes(key.getHasNonCompBytes()); return copy; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index d9caa47..c9cbaf8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; @@ -90,7 +91,7 @@ private Operator> reducer; private Object keyObject = null; - private BytesWritable groupKey; + private HiveKey groupKey; private boolean vectorized = false; @@ -256,7 +257,7 @@ public boolean pushRecord() throws HiveException { return false; } - BytesWritable keyWritable = (BytesWritable) reader.getCurrentKey(); + HiveKey keyWritable = (HiveKey) reader.getCurrentKey(); valueWritables = reader.getCurrentValues(); //Set the key, check if this is a new group or same group @@ -271,13 +272,14 @@ public boolean pushRecord() throws HiveException { if (handleGroupKey && !keyWritable.equals(this.groupKey)) { // If a operator wants to do some work at the beginning of a group if (groupKey == null) { // the first group - this.groupKey = new BytesWritable(); + this.groupKey = new HiveKey(); } else { // If a operator wants to do some work at the end of a group reducer.endGroup(); } groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength()); + groupKey.setHasNonCompBytes(keyWritable.getHasNonCompBytes()); reducer.startGroup(); reducer.setGroupKeyObject(keyObject); } @@ -374,20 +376,21 @@ private boolean pushRecordVector() { return false; } - BytesWritable keyWritable = (BytesWritable) reader.getCurrentKey(); + HiveKey keyWritable = (HiveKey) reader.getCurrentKey(); valueWritables = reader.getCurrentValues(); // Check if this is a new group or same group if (handleGroupKey && !keyWritable.equals(this.groupKey)) { // If a operator wants to do some work at the beginning of a group if (groupKey == null) { // the first group - this.groupKey = new BytesWritable(); + this.groupKey = new HiveKey(); } else { // If a operator wants to do some work at the end of a group reducer.endGroup(); } groupKey.set(keyWritable.getBytes(), 0, keyWritable.getLength()); + groupKey.setHasNonCompBytes(keyWritable.getHasNonCompBytes()); reducer.startGroup(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java index f9cf2bd..fe68f91 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java @@ -18,8 +18,15 @@ package org.apache.hadoop.hive.ql.io; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils; +import org.apache.hadoop.io.BinaryComparable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; /** * HiveKey is a simple wrapper on Text which allows us to set the hashCode @@ -34,6 +41,9 @@ private transient int distKeyLength; + // indicates some bytes need to be skipped for comparison + private boolean hasNonCompBytes = false; + public HiveKey() { hashCodeValid = false; } @@ -49,6 +59,47 @@ public void setHashCode(int myHashCode) { hashCode = myHashCode; } + public void setHasNonCompBytes(boolean hasNonCompBytes) { + this.hasNonCompBytes = hasNonCompBytes; + } + + public boolean getHasNonCompBytes() { + return hasNonCompBytes; + } + + @Override + public void write(DataOutput out) throws IOException { + if (hasNonCompBytes) { + int len = getLength(); + out.writeInt(-len); + out.write(getBytes(), 0, len); + } else { + super.write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + int len = in.readInt(); + if (len < 0) { + len = -len; + setHasNonCompBytes(true); + } else { + setHasNonCompBytes(false); + } + setSize(0); + setSize(len); + in.readFully(getBytes(), 0, len); + } + + @Override + public boolean equals(Object right_obj) { + if (right_obj instanceof HiveKey) { + return compareTo((HiveKey) right_obj) == 0; + } + return false; + } + @Override public int hashCode() { if (!hashCodeValid) { @@ -66,6 +117,43 @@ public int getDistKeyLength() { return distKeyLength; } + @Override + public int compareTo(BinaryComparable other) { + if (this == other) { + return 0; + } + if (!(other instanceof HiveKey)) { + return -1; + } + byte[] bytes1 = getBytes(); + int len1 = getLength(); + byte[] bytes2 = other.getBytes(); + int len2 = other.getLength(); + return compare(bytes1, 0, len1, getHasNonCompBytes(), + bytes2, 0, len2, ((HiveKey) other).getHasNonCompBytes()); + } + + @Override + public int compareTo(byte[] other, int off, int len) { + throw new UnsupportedOperationException(); + } + + /** + * Get the length of non-comparable bytes. + */ + private static int getNonCompLen(byte[] bytes, int start) { + // the first VInt stores the length of bytes need to be skipped in comparison + return (int) (LazyBinaryUtils.readVLongFromByteArray(bytes, start) + + WritableUtils.decodeVIntSize(bytes[start])); + } + + public static int compare(byte[] b1, int s1, int l1, boolean h1, + byte[] b2, int s2, int l2, boolean h2) { + int skip1 = h1 ? getNonCompLen(b1, s1) : 0; + int skip2 = h2 ? getNonCompLen(b2, s2) : 0; + return WritableComparator.compareBytes(b1, s1 + skip1, l1 - skip1, b2, s2 + skip2, l2 - skip2); + } + /** A Comparator optimized for HiveKey. */ public static class Comparator extends WritableComparator { public Comparator() { @@ -77,8 +165,8 @@ public Comparator() { */ @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { - return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2 - + LENGTH_BYTES, l2 - LENGTH_BYTES); + return HiveKey.compare(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b1[s1] < 0, + b2, s2 + LENGTH_BYTES, l2 - LENGTH_BYTES, b2[s2] < 0); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveKey.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveKey.java new file mode 100644 index 0000000..731fb55 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveKey.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io; + +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; + +public class TestHiveKey { + + @Test + public void testSerDe() throws IOException { + // without skipped bytes + byte[] key = "key".getBytes(); + verifySerDe(key, false); + + // with skipped bytes + byte[] skippedBytes = getSkippedBytes(ThreadLocalRandom.current().nextInt(1000) + 1); + byte[] key2 = new byte[skippedBytes.length + key.length]; + System.arraycopy(skippedBytes, 0, key2, 0, skippedBytes.length); + System.arraycopy(key, 0, key2, skippedBytes.length, key.length); + verifySerDe(key2, true); + } + + @Test + public void testCompare() throws IOException { + byte[] key1 = "ab".getBytes(); + byte[] key2 = "cd".getBytes(); + + // compare in deserialized format + HiveKey hiveKey1 = new HiveKey(key1, 0); + HiveKey hiveKey2 = new HiveKey(key2, 0); + byte[] skippedBytes = getSkippedBytes(ThreadLocalRandom.current().nextInt(1000) + 1); + byte[] key3 = new byte[key1.length + skippedBytes.length]; + System.arraycopy(skippedBytes, 0, key3, 0, skippedBytes.length); + System.arraycopy(key1, 0, key3, skippedBytes.length, key1.length); + HiveKey hiveKey3 = new HiveKey(key3, 0); + hiveKey3.setHasNonCompBytes(true); + Assert.assertTrue(hiveKey1.compareTo(hiveKey2) < 0); + Assert.assertTrue(hiveKey3.compareTo(hiveKey2) < 0); + Assert.assertEquals(hiveKey1, hiveKey3); + + // compare in serialized format + ByteArrayOutputStream outBytes = new ByteArrayOutputStream(); + hiveKey1.write(new DataOutputStream(outBytes)); + byte[] bytes1 = outBytes.toByteArray(); + outBytes.reset(); + hiveKey2.write(new DataOutputStream(outBytes)); + byte[] bytes2 = outBytes.toByteArray(); + outBytes.reset(); + hiveKey3.write(new DataOutputStream(outBytes)); + byte[] bytes3 = outBytes.toByteArray(); + WritableComparator comparator = WritableComparator.get(HiveKey.class); + Assert.assertTrue(comparator.compare(bytes1, 0, bytes1.length, bytes2, 0, bytes2.length) < 0); + Assert.assertTrue(comparator.compare(bytes3, 0, bytes3.length, bytes2, 0, bytes2.length) < 0); + Assert.assertTrue(comparator.compare(bytes1, 0, bytes1.length, bytes3, 0, bytes3.length) == 0); + } + + private static byte[] getSkippedBytes(int numBytes) { + int len = WritableUtils.getVIntSize(numBytes) + numBytes; + byte[] bytes = new byte[len]; + LazyBinaryUtils.writeVLongToByteArray(bytes, numBytes); + for (int i = len - numBytes; i < bytes.length; i++) { + bytes[i] = (byte) ThreadLocalRandom.current().nextInt(); + } + return bytes; + } + + private static void verifySerDe(byte[] key, boolean hasSkipped) throws IOException { + HiveKey hiveKey = new HiveKey(); + hiveKey.set(key, 0, key.length); + hiveKey.setHasNonCompBytes(hasSkipped); + ByteArrayOutputStream outBytes = new ByteArrayOutputStream(); + hiveKey.write(new DataOutputStream(outBytes)); + HiveKey deserialized = new HiveKey(); + ByteArrayInputStream inBytes = new ByteArrayInputStream(outBytes.toByteArray()); + deserialized.readFields(new DataInputStream(inBytes)); + Assert.assertArrayEquals(hiveKey.getBytes(), deserialized.getBytes()); + } +}