diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java
index 3ce1fb3..d603963 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/thrift/ColumnBuffer.java
@@ -25,6 +25,7 @@
import java.util.BitSet;
import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hive.service.rpc.thrift.TBinaryColumn;
import org.apache.hive.service.rpc.thrift.TBoolColumn;
import org.apache.hive.service.rpc.thrift.TByteColumn;
@@ -177,73 +178,83 @@ public ColumnBuffer(TColumn colValues) {
}
}
- public ColumnBuffer extractSubset(int start, int end) {
- BitSet subNulls = nulls.get(start, end);
+ /**
+ * Get a subset of this ColumnBuffer, starting from the 1st value.
+ *
+ * @param end index after the last value to include
+ */
+ public ColumnBuffer extractSubset(int end) {
+ BitSet subNulls = nulls.get(0, end);
if (type == Type.BOOLEAN_TYPE) {
ColumnBuffer subset =
- new ColumnBuffer(type, subNulls, Arrays.copyOfRange(boolVars, start, end));
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(boolVars, 0, end));
boolVars = Arrays.copyOfRange(boolVars, end, size);
- nulls = nulls.get(start, size);
+ nulls = nulls.get(end, size);
size = boolVars.length;
return subset;
}
if (type == Type.TINYINT_TYPE) {
ColumnBuffer subset =
- new ColumnBuffer(type, subNulls, Arrays.copyOfRange(byteVars, start, end));
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(byteVars, 0, end));
byteVars = Arrays.copyOfRange(byteVars, end, size);
- nulls = nulls.get(start, size);
+ nulls = nulls.get(end, size);
size = byteVars.length;
return subset;
}
if (type == Type.SMALLINT_TYPE) {
ColumnBuffer subset =
- new ColumnBuffer(type, subNulls, Arrays.copyOfRange(shortVars, start, end));
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(shortVars, 0, end));
shortVars = Arrays.copyOfRange(shortVars, end, size);
- nulls = nulls.get(start, size);
+ nulls = nulls.get(end, size);
size = shortVars.length;
return subset;
}
if (type == Type.INT_TYPE) {
ColumnBuffer subset =
- new ColumnBuffer(type, subNulls, Arrays.copyOfRange(intVars, start, end));
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(intVars, 0, end));
intVars = Arrays.copyOfRange(intVars, end, size);
- nulls = nulls.get(start, size);
+ nulls = nulls.get(end, size);
size = intVars.length;
return subset;
}
if (type == Type.BIGINT_TYPE) {
ColumnBuffer subset =
- new ColumnBuffer(type, subNulls, Arrays.copyOfRange(longVars, start, end));
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(longVars, 0, end));
longVars = Arrays.copyOfRange(longVars, end, size);
- nulls = nulls.get(start, size);
+ nulls = nulls.get(end, size);
size = longVars.length;
return subset;
}
if (type == Type.DOUBLE_TYPE || type == Type.FLOAT_TYPE) {
ColumnBuffer subset =
- new ColumnBuffer(type, subNulls, Arrays.copyOfRange(doubleVars, start, end));
+ new ColumnBuffer(type, subNulls, Arrays.copyOfRange(doubleVars, 0, end));
doubleVars = Arrays.copyOfRange(doubleVars, end, size);
- nulls = nulls.get(start, size);
+ nulls = nulls.get(end, size);
size = doubleVars.length;
return subset;
}
if (type == Type.BINARY_TYPE) {
- ColumnBuffer subset = new ColumnBuffer(type, subNulls, binaryVars.subList(start, end));
+ ColumnBuffer subset = new ColumnBuffer(type, subNulls, binaryVars.subList(0, end));
binaryVars = binaryVars.subList(end, binaryVars.size());
- nulls = nulls.get(start, size);
+ nulls = nulls.get(end, size);
size = binaryVars.size();
return subset;
}
if (type == Type.STRING_TYPE) {
- ColumnBuffer subset = new ColumnBuffer(type, subNulls, stringVars.subList(start, end));
+ ColumnBuffer subset = new ColumnBuffer(type, subNulls, stringVars.subList(0, end));
stringVars = stringVars.subList(end, stringVars.size());
- nulls = nulls.get(start, size);
+ nulls = nulls.get(end, size);
size = stringVars.size();
return subset;
}
throw new IllegalStateException("invalid union object");
}
+ @VisibleForTesting
+ BitSet getNulls() {
+ return nulls;
+ }
+
private static final byte[] MASKS = new byte[] {
0x01, 0x02, 0x04, 0x08, 0x10, 0x20, 0x40, (byte)0x80
};
diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/thrift/TestColumnBuffer.java b/serde/src/test/org/apache/hadoop/hive/serde2/thrift/TestColumnBuffer.java
new file mode 100644
index 0000000..5879561
--- /dev/null
+++ b/serde/src/test/org/apache/hadoop/hive/serde2/thrift/TestColumnBuffer.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.serde2.thrift;
+
+import com.google.code.tempusfugit.concurrency.RepeatingRule;
+import com.google.code.tempusfugit.concurrency.annotations.Repeating;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class TestColumnBuffer {
+ @Rule
+ public RepeatingRule repeatingRule = new RepeatingRule();
+
+ private static final int NUM_VARS = 100;
+ private static final int NUM_NULLS = 30;
+ private static final Set nullIndices = new HashSet<>();
+
+ private final Type type;
+ private final Object vars;
+
+ @Parameterized.Parameters
+ public static Collection types() {
+ return Arrays.asList(new Object[][]{
+ {Type.BOOLEAN_TYPE},
+ {Type.TINYINT_TYPE},
+ {Type.SMALLINT_TYPE},
+ {Type.INT_TYPE},
+ {Type.BIGINT_TYPE},
+ {Type.DOUBLE_TYPE},
+ {Type.FLOAT_TYPE},
+ {Type.BINARY_TYPE},
+ {Type.STRING_TYPE}
+ }
+ );
+ }
+
+ public TestColumnBuffer(Type type) {
+ this.type = type;
+ switch (type) {
+ case BOOLEAN_TYPE:
+ vars = new boolean[NUM_VARS];
+ break;
+ case TINYINT_TYPE:
+ vars = new byte[NUM_VARS];
+ break;
+ case SMALLINT_TYPE:
+ vars = new short[NUM_VARS];
+ break;
+ case INT_TYPE:
+ vars = new int[NUM_VARS];
+ break;
+ case BIGINT_TYPE:
+ vars = new long[NUM_VARS];
+ break;
+ case DOUBLE_TYPE:
+ case FLOAT_TYPE:
+ vars = new double[NUM_VARS];
+ break;
+ case BINARY_TYPE:
+ vars = Arrays.asList(new ByteBuffer[NUM_VARS]);
+ break;
+ case STRING_TYPE:
+ vars = Arrays.asList(new String[NUM_VARS]);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid type " + type);
+ }
+ }
+
+ private static void prepareNullIndices() {
+ nullIndices.clear();
+ Random random = ThreadLocalRandom.current();
+ while (nullIndices.size() != NUM_NULLS) {
+ nullIndices.add(random.nextInt(NUM_VARS));
+ }
+ }
+
+ /**
+ * Test if the nulls BitSet is maintained properly when we extract subset from ColumnBuffer.
+ * E.g. suppose we have a ColumnBuffer with nulls [0, 0, 1, 0]. When we split it evenly into
+ * two subsets, the subsets should have nulls [0, 0] and [1, 0] respectively.
+ */
+ @Test
+ @Repeating(repetition = 10)
+ public void testNullsInSubset() {
+ prepareNullIndices();
+ BitSet nulls = new BitSet(NUM_VARS);
+ for (int index : nullIndices) {
+ nulls.set(index);
+ }
+
+ ColumnBuffer columnBuffer = new ColumnBuffer(type, nulls, vars);
+ Random random = ThreadLocalRandom.current();
+
+ int remaining = NUM_VARS;
+ while (remaining > 0) {
+ int toExtract = random.nextInt(remaining) + 1;
+ ColumnBuffer subset = columnBuffer.extractSubset(toExtract);
+ verifyNulls(subset, NUM_VARS - remaining);
+ remaining -= toExtract;
+ }
+ }
+
+ private static void verifyNulls(ColumnBuffer buffer, int shift) {
+ BitSet nulls = buffer.getNulls();
+ for (int i = 0; i < buffer.size(); i++) {
+ Assert.assertEquals("BitSet in parent and subset not the same.",
+ nullIndices.contains(i + shift), nulls.get(i));
+ }
+ }
+
+ @Test
+ public void testAddValues() {
+ switch (type) {
+ case BOOLEAN_TYPE:
+ testBooleanValues();
+ break;
+ case TINYINT_TYPE:
+ case SMALLINT_TYPE:
+ case INT_TYPE:
+ case BIGINT_TYPE:
+ testAllIntegerTypeValues();
+ break;
+ case DOUBLE_TYPE:
+ case FLOAT_TYPE:
+ testFloatAndDoubleValues();
+ break;
+ case BINARY_TYPE:
+ testBinaryValues();
+ break;
+ case STRING_TYPE:
+ testStringValues();
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid type " + type);
+ }
+ }
+
+
+ private void testAllIntegerTypeValues() {
+ Map> integerTypesAndValues = new LinkedHashMap>();
+
+ // Add TINYINT values
+ integerTypesAndValues.put(Type.TINYINT_TYPE, Arrays.asList(
+ Byte.MIN_VALUE, Byte.MAX_VALUE
+ ));
+
+ // Add SMALLINT values
+ integerTypesAndValues.put(Type.SMALLINT_TYPE, Arrays.asList(
+ Short.MIN_VALUE, Short.MIN_VALUE
+ ));
+
+ // Add INT values
+ integerTypesAndValues.put(Type.INT_TYPE, Arrays.asList(
+ Integer.MIN_VALUE, Integer.MAX_VALUE
+ ));
+
+ // Add BIGINT values
+ integerTypesAndValues.put(Type.BIGINT_TYPE, Arrays.asList(
+ Long.MIN_VALUE, Long.MAX_VALUE
+ ));
+
+ // Validate all integer type values are stored correctly
+ for (Map.Entry entry : integerTypesAndValues.entrySet()) {
+ Type type = (Type) entry.getKey();
+ List values = (List) entry.getValue();
+
+ ColumnBuffer c = new ColumnBuffer(type);
+ for (Object v : values) {
+ c.addValue(type, v);
+ }
+
+ assertEquals(type, c.getType());
+ assertEquals(values.size(), c.size());
+
+ for (int i = 0; i < c.size(); i++) {
+ assertEquals(values.get(i), c.get(i));
+ }
+ }
+ }
+
+
+ private void testFloatAndDoubleValues() {
+ ColumnBuffer floatColumn = new ColumnBuffer(Type.FLOAT_TYPE);
+ floatColumn.addValue(Type.FLOAT_TYPE, 1.1f);
+ floatColumn.addValue(Type.FLOAT_TYPE, 2.033f);
+
+ // FLOAT_TYPE is treated as DOUBLE_TYPE
+ assertEquals(Type.FLOAT_TYPE, floatColumn.getType());
+ assertEquals(2, floatColumn.size());
+ assertEquals(1.1, floatColumn.get(0));
+ assertEquals(2.033, floatColumn.get(1));
+
+ ColumnBuffer doubleColumn = new ColumnBuffer(Type.DOUBLE_TYPE);
+ doubleColumn.addValue(Type.DOUBLE_TYPE, 1.1);
+ doubleColumn.addValue(Type.DOUBLE_TYPE, 2.033);
+
+ assertEquals(Type.DOUBLE_TYPE, doubleColumn.getType());
+ assertEquals(2, doubleColumn.size());
+ assertEquals(1.1, doubleColumn.get(0));
+ assertEquals(2.033, doubleColumn.get(1));
+ }
+
+
+ private void testBooleanValues() {
+ ColumnBuffer boolColumn = new ColumnBuffer(Type.BOOLEAN_TYPE);
+ boolColumn.addValue(Type.BOOLEAN_TYPE, true);
+ boolColumn.addValue(Type.BOOLEAN_TYPE, false);
+
+ assertEquals(Type.BOOLEAN_TYPE, boolColumn.getType());
+ assertEquals(2, boolColumn.size());
+ assertEquals(true, boolColumn.get(0));
+ assertEquals(false, boolColumn.get(1));
+ }
+
+
+ private void testStringValues() {
+ ColumnBuffer stringColumn = new ColumnBuffer(Type.STRING_TYPE);
+ stringColumn.addValue(Type.STRING_TYPE, "12abc456");
+ stringColumn.addValue(Type.STRING_TYPE, "~special$&string");
+
+ assertEquals(Type.STRING_TYPE, stringColumn.getType());
+ assertEquals(2, stringColumn.size());
+ assertEquals("12abc456", stringColumn.get(0));
+ assertEquals("~special$&string", stringColumn.get(1));
+ }
+
+
+ private void testBinaryValues() {
+ ColumnBuffer binaryColumn = new ColumnBuffer(Type.BINARY_TYPE);
+ binaryColumn.addValue(Type.BINARY_TYPE, new byte[]{-1, 0, 3, 4});
+
+ assertEquals(Type.BINARY_TYPE, binaryColumn.getType());
+ assertEquals(1, binaryColumn.size());
+ assertArrayEquals(new byte[]{-1, 0, 3, 4}, (byte[]) binaryColumn.get(0));
+ }
+}
diff --git a/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java b/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java
index 9cbe89c..3774426 100644
--- a/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java
+++ b/service/src/java/org/apache/hive/service/cli/ColumnBasedSet.java
@@ -137,7 +137,7 @@ public ColumnBasedSet extractSubset(int maxRows) {
List subset = new ArrayList();
for (int i = 0; i < columns.size(); i++) {
- subset.add(columns.get(i).extractSubset(0, numRows));
+ subset.add(columns.get(i).extractSubset(numRows));
}
ColumnBasedSet result = new ColumnBasedSet(descriptors, subset, startOffset);
startOffset += numRows;
diff --git a/service/src/test/org/apache/hive/service/cli/TestColumn.java b/service/src/test/org/apache/hive/service/cli/TestColumn.java
deleted file mode 100644
index 6589fc3..0000000
--- a/service/src/test/org/apache/hive/service/cli/TestColumn.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hive.service.cli;
-
-import org.apache.hadoop.hive.serde2.thrift.ColumnBuffer;
-import org.apache.hadoop.hive.serde2.thrift.Type;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-public class TestColumn {
- @Test
- public void testAllIntegerTypeValues() {
- Map> integerTypesAndValues = new LinkedHashMap>();
-
- // Add TINYINT values
- integerTypesAndValues.put(Type.TINYINT_TYPE, Arrays.asList(
- Byte.MIN_VALUE, Byte.MAX_VALUE
- ));
-
- // Add SMALLINT values
- integerTypesAndValues.put(Type.SMALLINT_TYPE, Arrays.asList(
- Short.MIN_VALUE, Short.MIN_VALUE
- ));
-
- // Add INT values
- integerTypesAndValues.put(Type.INT_TYPE, Arrays.asList(
- Integer.MIN_VALUE, Integer.MAX_VALUE
- ));
-
- // Add BIGINT values
- integerTypesAndValues.put(Type.BIGINT_TYPE, Arrays.asList(
- Long.MIN_VALUE, Long.MAX_VALUE
- ));
-
- // Validate all integer type values are stored correctly
- for (Map.Entry entry : integerTypesAndValues.entrySet()) {
- Type type = (Type)entry.getKey();
- List values = (List)entry.getValue();
-
- ColumnBuffer c = new ColumnBuffer(type);
- for (Object v : values) {
- c.addValue(type, v);
- }
-
- assertEquals(type, c.getType());
- assertEquals(values.size(), c.size());
-
- for (int i=0; i