diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 7ee263d..2c86285 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier; import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory; import org.apache.hadoop.hive.ql.io.orc.WriterImpl; import org.apache.orc.OrcProto; @@ -49,6 +50,7 @@ private OrcStripeMetadata[] stripes; private final boolean skipCorrupt; // TODO: get rid of this private final QueryFragmentCounters counters; + private SargApplier sargApp; public OrcEncodedDataConsumer( Consumer consumer, int colCount, boolean skipCorrupt, @@ -71,6 +73,10 @@ public void setStripeMetadata(OrcStripeMetadata m) { stripes[m.getStripeIx()] = m; } + public void setSargApplier(SargApplier sargApp) { + this.sargApp = sargApp; + } + @Override protected void decodeBatch(OrcEncodedColumnBatch batch, Consumer downstreamConsumer) { @@ -117,9 +123,33 @@ protected void decodeBatch(OrcEncodedColumnBatch batch, cvb.cols[idx] = (ColumnVector)columnReaders[idx].nextVector(cvb.cols[idx], batchSize); } + // apply row-level sarg + SargApplier.PickRowResult result = sargApp.pickRows(cvb.cols, cvb.size); + + if (result.isPartRowsSelected()) { + // shrink CV + int destIdx = 0; + for (int selIdx = 0; selIdx < result.size; ++selIdx) { + int srcIdx = result.selected[selIdx]; + if (destIdx != srcIdx) { + for (int col = 0; col < cvb.cols.length; ++col) { + cvb.cols[col].setElement(destIdx, srcIdx, cvb.cols[col]); + } + } + ++destIdx; + } + cvb.size = destIdx; + } else if (result.isAllRowsSelected()) { + // do nothing + } else { + assert result.isEmpty(); + cvbPool.offer(cvb); + continue; + } + // we are done reading a batch, send it to consumer for processing downstreamConsumer.consumeData(cvb); - counters.incrCounter(LlapIOCounters.ROWS_EMITTED, batchSize); + counters.incrCounter(LlapIOCounters.ROWS_EMITTED, cvb.size); } counters.incrTimeCounter(LlapIOCounters.DECODE_TIME_NS, startTime); counters.incrCounter(LlapIOCounters.NUM_VECTOR_BATCHES, maxBatchesRG); diff --git llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index fb0867d..8723ab7 100644 --- llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -693,6 +693,7 @@ private boolean determineRgsToRead(boolean[] globalIncludes, int rowIndexStride, String[] colNamesForSarg = OrcInputFormat.getSargColumnNames( columnNames, types, globalIncludes, fileMetadata.isOriginalFormat()); sargApp = new SargApplier(sarg, colNamesForSarg, rowIndexStride, types, globalIncludes.length); + consumer.setSargApplier(sargApp); } boolean hasAnyData = false; // readState should have been initialized by this time with an empty array. diff --git llap-server/src/test/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducerTest.java llap-server/src/test/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducerTest.java new file mode 100644 index 0000000..4a3bf41 --- /dev/null +++ llap-server/src/test/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducerTest.java @@ -0,0 +1,550 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.llap.io.decode; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.cache.BuddyAllocator; +import org.apache.hadoop.hive.llap.cache.BufferUsageManager; +import org.apache.hadoop.hive.llap.cache.EvictionAwareAllocator; +import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl; +import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager; +import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy; +import org.apache.hadoop.hive.llap.cache.LowLevelLrfuCachePolicy; +import org.apache.hadoop.hive.llap.cache.SimpleBufferManager; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; +import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics; +import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.Operator; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf.Type; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Before; +import org.junit.Test; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Method; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.sql.Date; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Callable; + +import static org.junit.Assert.*; + +public class OrcColumnVectorProducerTest { + + public static class MyRow implements Writable { + int seq; + + byte atinyint; + short asmallint; + int aint; + long abigint; + HiveDecimalWritable adecimal = new HiveDecimalWritable("0"); + float afloat; + double adouble; + boolean aboolean; + BytesWritable abinary = new BytesWritable(); + String astring = ""; + String avarchar = ""; + String achar = ""; + Timestamp atimestamp = new Timestamp(0); + Date adate = new Date(0); + + MyRow(int seq) { + this.seq = seq; + } + + public MyRow setAboolean(boolean v) { + aboolean = v; + return this; + } + + public MyRow setAint(int v) { + aint = v; + return this; + } + + public MyRow setAstring(String v) { + this.astring = v; + return this; + } + + public MyRow setAtimestamp(Timestamp v) { + this.atimestamp = v; + return this; + } + + public MyRow setAfloat(float v) { + this.afloat = v; + return this; + } + + public MyRow setAdecimal(HiveDecimalWritable v) { + this.adecimal = v; + return this; + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + throw new UnsupportedOperationException("no write"); + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + throw new UnsupportedOperationException("no read"); + } + + static String getColumnNamesProperty() { + return "seq,atinyint,asmallint,aint,abigint,adecimal,afloat,adouble,aboolean,abinary,astring,avarchar,achar,atimestamp,adate"; + } + + static String getColumnTypesProperty() { + return "int:tinyint:smallint:int:bigint:decimal:float:double:boolean:binary:string:varchar(10):char(10):timestamp:date"; + } + } + + class TestPredicate { + Operator op; + String fieldName; + Type type; + Object literal; + Object[] literals; + + TestPredicate(Operator op, String fieldName, Type type) { + this.op = op; + this.fieldName = fieldName; + this.type = type; + } + + TestPredicate(Operator op, String fieldName, Type type, Object literal) { + this(op, fieldName, type); + this.literal = literal; + } + + TestPredicate(Operator op, String fieldName, Type type, Object[] literals) { + this(op, fieldName, type); + this.literals = literals; + } + } + + class TestDataBuilder { + MyRow[] rows; + int[] expectedRowIdxs; + TestPredicate testPredicate; + boolean shouldThrowError; + int sargedRows; + String explain; + + TestDataBuilder(String explain) { + this.explain = explain; + + MyRow[] rows = new MyRow[VectorizedRowBatch.DEFAULT_SIZE]; + for (int i = 0; i < rows.length; ++i) { + rows[i] = new MyRow(i); + } + this.rows = rows; + } + + TestDataBuilder sargedRowsShouldBe(int rows) { + sargedRows = rows; + return this; + } + + TestDataBuilder setAllColumnValue(String fieldName, Object literal) { + setColumnValue(fieldName, literal, VectorizedRowBatch.DEFAULT_SIZE); + return this; + } + + TestDataBuilder setColumnValue(String fieldName, Object literal, int rows) { + this.expectedRowIdxs = new int[rows]; + + for (int i = 0; i < rows; ++i) { + Method method; + try { + String methodName = "set" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1); + method = this.rows[i].getClass().getMethod(methodName, new Class[]{literal.getClass()}); + method.invoke(this.rows[i], new Object[]{literal}); + + this.expectedRowIdxs[i] = i; + } catch (Exception e) { + assertTrue(false); + } + } + return this; + } + + TestDataBuilder setColumnNull(String fieldName, Class cls, int rows) { + this.expectedRowIdxs = new int[rows]; + + for (int i = 0; i < rows; ++i) { + Method method; + try { + String methodName = "set" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1); + method = this.rows[i].getClass().getMethod(methodName, new Class[]{cls}); + method.invoke(this.rows[i], new Object[]{null}); + + this.expectedRowIdxs[i] = i; + } catch (Exception e) { + assertTrue(false); + } + } + return this; + } + + TestDataBuilder predicate(Operator predicateOp, String predicateFieldName, Type predicateType, + Object literal) { + testPredicate = new TestPredicate(predicateOp, predicateFieldName, predicateType, literal); + return this; + } + + TestDataBuilder predicate(Operator predicateOp, String predicateFieldName, Type predicateType, + Object[] literals) { + testPredicate = new TestPredicate(predicateOp, predicateFieldName, predicateType, literals); + return this; + } + + TestDataBuilder predicate(Operator predicateOp, String predicateFieldName, Type predicateType) { + testPredicate = new TestPredicate(predicateOp, predicateFieldName, predicateType); + return this; + } + + TestDataBuilder shouldThrowError() { + shouldThrowError = true; + return this; + } + + TestData build() { + return new TestData(explain, testPredicate, rows, expectedRowIdxs, shouldThrowError, sargedRows); + } + } + + + class TestData { + TestPredicate testPredicate; + MyRow[] rows; + int[] expectedRowIdxs; + boolean isExpectedTypeError; + int sargedRows; + String explain; + + TestData(String explain, TestPredicate predicate, MyRow[] rows, int[] expectedRowIdxs, boolean shouldThrowError, int sargedRows) { + this.explain = explain; + this.testPredicate = predicate; + this.rows = rows; + this.expectedRowIdxs = expectedRowIdxs; + isExpectedTypeError = shouldThrowError; + this.sargedRows = sargedRows; + } + + + private Object getLiteral(Type type) { + switch (type) { + case LONG: + return new Long(1234); + case FLOAT: + return new Double(12345); + case STRING: + return new String("foobar"); + case DATE: + return new Date(999); + case DECIMAL: + return new HiveDecimalWritable("1844674407"); + case TIMESTAMP: + return new Timestamp(1460125932000L); + case BOOLEAN: + return Boolean.TRUE; + default: + assertFalse(true); + break; + } + return null; + } + } + + TestData[] testData; + + @Before + public void setUp() { + long prev_ts = 1460125931999L; + long middle_ts = 1460125932000L; + long after_ts = 1460125932001L; + long after_after_ts = 1460125932002L; + + testData = new TestData[]{ + // where atimestamp = xxx + new TestDataBuilder("where atimestamp = xxx, No matched rows") + .setColumnValue("atimestamp", new Timestamp(middle_ts), 0) + .predicate(Operator.EQUALS, "atimestamp", Type.TIMESTAMP, new Timestamp(middle_ts)) // where atimestamp = xxx + .sargedRowsShouldBe(0).build(), + new TestDataBuilder("where atimestamp = xxx, ONE row match") + .setColumnValue("atimestamp", new Timestamp(middle_ts), 1) + .predicate(Operator.EQUALS, "atimestamp", Type.TIMESTAMP, new Timestamp(middle_ts)) // where atimestamp = xxx + .sargedRowsShouldBe(1).build(), + new TestDataBuilder("where atimestamp = xxx, ALL rows match") + .setColumnValue("atimestamp", new Timestamp(middle_ts), VectorizedRowBatch.DEFAULT_SIZE) + .predicate(Operator.EQUALS, "atimestamp", Type.TIMESTAMP, new Timestamp(middle_ts)) // where atimestamp = xxx + .sargedRowsShouldBe(VectorizedRowBatch.DEFAULT_SIZE).build(), + + // where atimestamp < xxx + new TestDataBuilder("where atimestamp < xxx, NO matched rows") + .setAllColumnValue("atimestamp", new Timestamp(after_ts)) + .predicate(Operator.LESS_THAN, "atimestamp", Type.TIMESTAMP, new Timestamp(middle_ts)) + .sargedRowsShouldBe(0).build(), + new TestDataBuilder("where atimestamp < xxx, ONE matched rows(smaller value than predicate)") + .setAllColumnValue("atimestamp", new Timestamp(after_ts)) + .setColumnValue("atimestamp", new Timestamp(prev_ts), 1) + .predicate(Operator.LESS_THAN, "atimestamp", Type.TIMESTAMP, new Timestamp(middle_ts)) + .sargedRowsShouldBe(1).build(), + new TestDataBuilder("where atimestamp < xxx, all rows have smaller value than predicate") + .setAllColumnValue("atimestamp", new Timestamp(prev_ts)) + .predicate(Operator.LESS_THAN, "atimestamp", Type.TIMESTAMP, new Timestamp(middle_ts)) + .sargedRowsShouldBe(VectorizedRowBatch.DEFAULT_SIZE).build(), + + // where atimestamp <= xxx + new TestDataBuilder("where atimestamp <= xxx, NO matched rows") + .setAllColumnValue("atimestamp", new Timestamp(after_ts)) + .predicate(Operator.LESS_THAN_EQUALS, "atimestamp", Type.TIMESTAMP, new Timestamp(middle_ts)) + .sargedRowsShouldBe(0).build(), + new TestDataBuilder("where atimestamp <= xxx, ONE matched rows that has equal value") + .setAllColumnValue("atimestamp", new Timestamp(after_ts)) + .setColumnValue("atimestamp", new Timestamp(middle_ts), 1) + .predicate(Operator.LESS_THAN_EQUALS, "atimestamp", Type.TIMESTAMP, new Timestamp(middle_ts)) + .sargedRowsShouldBe(1).build(), + new TestDataBuilder("where atimestamp <= xxx, the table has ONE matched rows that has smaller value than predicate") + .setAllColumnValue("atimestamp", new Timestamp(after_ts)) + .setColumnValue("atimestamp", new Timestamp(prev_ts), 1) + .predicate(Operator.LESS_THAN_EQUALS, "atimestamp", Type.TIMESTAMP, new Timestamp(middle_ts)) + .sargedRowsShouldBe(1).build(), + new TestDataBuilder("where atimestamp <= xxx, the table all rows have equals value than predicate") + .setAllColumnValue("atimestamp", new Timestamp(middle_ts)) + .predicate(Operator.LESS_THAN_EQUALS, "atimestamp", Type.TIMESTAMP, new Timestamp(middle_ts)) + .sargedRowsShouldBe(VectorizedRowBatch.DEFAULT_SIZE).build(), + new TestDataBuilder("where atimestamp <= xxx, the table all rows have smaller value than predicate") + .setAllColumnValue("atimestamp", new Timestamp(prev_ts)) + .predicate(Operator.LESS_THAN_EQUALS, "atimestamp", Type.TIMESTAMP, new Timestamp(middle_ts)) + .sargedRowsShouldBe(VectorizedRowBatch.DEFAULT_SIZE).build(), + + // where atimestamp between A and B + new TestDataBuilder("where atimestamp BETWEEN prev AND after, ONE row has middle") + .setAllColumnValue("atimestamp", new Timestamp(after_after_ts)) + .setColumnValue("atimestamp", new Timestamp(middle_ts), 1) + .predicate(Operator.BETWEEN, "atimestamp", Type.TIMESTAMP, new Timestamp[]{new Timestamp(prev_ts), new Timestamp(after_ts)}) + .sargedRowsShouldBe(1).build(), + + // where atimestamp in (A, B) + new TestDataBuilder("where atimestamp IN (middle, after), ONE row has middle") + .setAllColumnValue("atimestamp", new Timestamp(after_after_ts)) + .setColumnValue("atimestamp", new Timestamp(middle_ts), 1) + .predicate(Operator.IN, "atimestamp", Type.TIMESTAMP, new Timestamp[]{new Timestamp(middle_ts), new Timestamp(after_ts)}) + .sargedRowsShouldBe(1).build(), + + // WHERE atimestamp IS NULL + new TestDataBuilder("where atimestamp IS_NULL , ONE row has null") + .setAllColumnValue("atimestamp", new Timestamp(after_after_ts)) + .setColumnNull("atimestamp", Timestamp.class, 1) + .predicate(Operator.IS_NULL, "atimestamp", Type.TIMESTAMP) + .sargedRowsShouldBe(1).build(), + + + + // error cases + + // where atimestamp = 1234L, raise type error + new TestDataBuilder("where atimestamp = xxx, No matched rows") + .setColumnValue("atimestamp", new Timestamp(middle_ts), 0) + .predicate(Operator.EQUALS, "atimestamp", Type.LONG, new Long(1234)) + .shouldThrowError().build(), + }; + } + + @Test + public void testRowLevelSargByLLAP() throws Exception { + // prepare test + HiveConf conf = new HiveConf(Driver.class); + LlapDaemonCacheMetrics cacheMetrics = LlapDaemonCacheMetrics.create("", ""); + + LlapDaemonQueueMetrics queueMetrics = LlapDaemonQueueMetrics.create("", "", new int[]{1}); + + LowLevelCachePolicy cachePolicy = new LowLevelLrfuCachePolicy(conf); + LowLevelCacheMemoryManager memManager = new LowLevelCacheMemoryManager( + conf, cachePolicy, cacheMetrics); + EvictionAwareAllocator allocator = new BuddyAllocator(conf, memManager, cacheMetrics); + BufferUsageManager bufferManager = new SimpleBufferManager(allocator, cacheMetrics); + + LowLevelCacheImpl orcCache = new LowLevelCacheImpl(cacheMetrics, cachePolicy, allocator, true); + + OrcColumnVectorProducer ocp = new OrcColumnVectorProducer(null, orcCache, bufferManager, conf, cacheMetrics, queueMetrics); + + Properties properties = new Properties(); + properties.setProperty("columns", MyRow.getColumnNamesProperty()); + properties.setProperty("columns.types", MyRow.getColumnTypesProperty()); + StructObjectInspector inspector; + inspector = (StructObjectInspector) + ObjectInspectorFactory.getReflectionObjectInspector(MyRow.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + + Path workDir = new Path(System.getProperty("test.tmp.dir","target/tmp")); + + List ids = new ArrayList(); + String[] fields = MyRow.getColumnNamesProperty().split(","); + for (int i = 0; i < fields.length; ++i) { + ids.add(i); + } + + JobConf jconf = new JobConf(); + FileSystem fs = FileSystem.getLocal(jconf); + HiveOutputFormat outFormat = new OrcOutputFormat(); + SerDe serde = new OrcSerde(); + + + // test body + int fileSeq = 0; + for (final TestData td: testData) { + // write test rows into ORC file + Path testFilePath = new Path(workDir, "TestInputOutputFormat." + this.getClass().getSimpleName() + "." + String.valueOf(fileSeq++) + ".orc"); + fs.delete(testFilePath, false); + + org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter writer = + outFormat.getHiveRecordWriter(jconf, testFilePath, MyRow.class, true, + properties, Reporter.NULL); + int x = 0; + for (MyRow row : td.rows) { + writer.write(serde.serialize(row, inspector)); + } + writer.close(true); + + // get filesplits + InputFormat in = new OrcInputFormat(); + FileInputFormat.setInputPaths(jconf, testFilePath.toString()); + InputSplit[] splits = in.getSplits(jconf, 1); + assertEquals(splits.length, 1); + + // get Sarg from test data + SearchArgument sarg = getSearchArgument(td.testPredicate); + + // make test consumer + final boolean[] callbackCalled = new boolean[]{false}; + final boolean[] done = new boolean[]{false}; + final boolean[] error = new boolean[]{false}; + Consumer consumer = new Consumer() { + public void consumeData(ColumnVectorBatch cvb) { + assertFalse("consumeData() is called twice!", callbackCalled[0]); + callbackCalled[0] = true; + + assertEquals(td.expectedRowIdxs.length, cvb.size); + + LongColumnVector seqCV = (LongColumnVector) cvb.cols[0]; + for (int i = 0; i < td.sargedRows; ++i) { + int seq = td.expectedRowIdxs[i]; + assertEquals(seq, seqCV.vector[i]); + } + } + + public void setDone() { + assertFalse("setDone() is called twice!", done[0]); + done[0] = true; + } + + public void setError(Throwable t) { + error[0] = true; + } + }; + + + // do the test by ReadPipeline + ReadPipeline rp = ocp.createReadPipeline(consumer, (FileSplit) splits[0], ids, + sarg, fields, new QueryFragmentCounters(conf, null)); + Callable cal = rp.getReadCallable(); + + cal.call(); + + // check expected rows are sarged + if (td.sargedRows > 0) { + assertTrue("consumeData() should be called, testData:" + td.explain , callbackCalled[0]); + } else { + assertFalse("consumeData() should NOT be called:" + td.explain, callbackCalled[0]); + } + + // type mismatch predicate should raise exception + if (td.isExpectedTypeError) { + assertTrue("setError() should be called:" + td.explain, error[0]); + } else { + assertTrue("setDone() should be called:" + td.explain, done[0]); + } + } + + } + + private SearchArgument getSearchArgument(TestPredicate tp) { + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + switch (tp.op) { + case EQUALS: + builder.equals(tp.fieldName, tp.type, tp.literal); + break; + case NULL_SAFE_EQUALS: + builder.nullSafeEquals(tp.fieldName, tp.type, tp.literal); + break; + case LESS_THAN: + builder.lessThan(tp.fieldName, tp.type, tp.literal); + break; + case LESS_THAN_EQUALS: + builder.lessThanEquals(tp.fieldName, tp.type, tp.literal); + break; + case IN: + builder.in(tp.fieldName, tp.type, tp.literals); + break; + case BETWEEN: + builder.between(tp.fieldName, tp.type, tp.literals[0], tp.literals[1]); + break; + case IS_NULL: + builder.isNull(tp.fieldName, tp.type); + break; + default: + assertFalse(true); + break; + } + return builder.build(); + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 3975409..330ebc6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -364,6 +364,99 @@ static TruthValue evaluatePredicate(ColumnStatistics stats, return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter); } + /** + * Apply row-level sarg to specific column + * @param cv the oprand of sarg predicate + * @param cvType orc type of cv + * @param predicate sarg predicate + * @param rowSize row size + * @return sarged result + */ + public static TruthValue[] evaluatePredicateRowLevel(ColumnVector cv, + final OrcProto.Type cvType, + PredicateLeaf predicate, + int rowSize) { + final Object predObj = getBaseObjectForComparison(predicate.getType(), predicate.getLiteral()); + + final List predObjList; + List origPredObjList = predicate.getLiteralList(); + if (origPredObjList != null) { + predObjList = new ArrayList<>(); + for (Object arg : origPredObjList) { + Object tmp = getBaseObjectForComparison(predicate.getType(), arg); + predObjList.add(tmp); + } + } else { + predObjList = null; + } + + ColumnVector.SargPredicateEvaluator sargPredicateEvaluator = new ColumnVector.SargPredicateEvaluator() { + public TruthValue eval(PredicateLeaf predicate, Object row) { + + // This logic depends on BinaryTreeReader + if (cvType.getKind() == OrcProto.Type.Kind.BOOLEAN) { + if (((Long)row).longValue() == 0L) { + row = Boolean.FALSE; + } else { + row = Boolean.TRUE; + } + } + + Object rowValue = getBaseObjectForComparison(predicate.getType(), row); + int cmp; + TruthValue tv; + + switch (predicate.getOperator()) { + case NULL_SAFE_EQUALS: + case EQUALS: + cmp = ((Comparable)predObj).compareTo(rowValue); + tv = cmp == 0 ? TruthValue.YES : TruthValue.NO; + break; + case LESS_THAN: + cmp = ((Comparable)predObj).compareTo(rowValue); + tv = cmp > 0 ? TruthValue.YES : TruthValue.NO; + break; + case LESS_THAN_EQUALS: + cmp = ((Comparable)predObj).compareTo(rowValue); + tv = cmp >= 0 ? TruthValue.YES : TruthValue.NO; + break; + case IN: + tv = TruthValue.NO; + for (Object predObj: predObjList) { + cmp = ((Comparable)predObj).compareTo(rowValue); + if (cmp == 0) { + tv = TruthValue.YES; + break; + } + } + break; + case BETWEEN: + Object predObj1 = predObjList.get(0); + cmp = ((Comparable)predObj1).compareTo(rowValue); + tv = TruthValue.NO; + if (cmp <= 0) { + Object predObj2 = predObjList.get(1); + cmp = ((Comparable)predObj2).compareTo(rowValue); + if (cmp >= 0) { + tv = TruthValue.YES; + } + } + break; + case IS_NULL: + tv = rowValue == null ? TruthValue.YES : TruthValue.NO; + break; + default: + assert false; + tv = TruthValue.YES_NO_NULL; + break; + } + return tv; + } + }; + + return cv.applySargPredicate(predicate, sargPredicateEvaluator, rowSize); + } + static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min, Object max, boolean hasNull, BloomFilterIO bloomFilter) { // if we didn't have any values, everything must have been null @@ -711,6 +804,8 @@ private static Object getBaseObjectForComparison(PredicateLeaf.Type type, Object private final long rowIndexStride; // same as the above array, but indices are set to true private final boolean[] sargColumns; + private final List types; + private TruthValue[][] rowLevelResult; public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStride, List types, int includedCount) { @@ -726,6 +821,8 @@ public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStrid sargColumns[i] = true; } } + this.types = types; + rowLevelResult = new TruthValue[sargLeaves.size()][]; } /** @@ -780,6 +877,63 @@ public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStrid return hasSkipped ? ((hasSelected || !returnNone) ? result : READ_NO_RGS) : READ_ALL_RGS; } + + /** + * Apply row-level sarg to each column values + * + * @param cols the operand of search argument + * @param rowSize row size + * @return this contains selected row index + */ + public PickRowResult pickRows(ColumnVector[] cols, int rowSize) { + for (int pred = 0; pred < sargLeaves.size(); ++pred) { + PredicateLeaf predicate = sargLeaves.get(pred); + int columnIx = filterColumns[pred]; + // -1 means root column + int colIdx = columnIx - 1; + rowLevelResult[pred] = RecordReaderImpl.evaluatePredicateRowLevel(cols[colIdx], + types.get(columnIx), predicate, rowSize); + } + + PickRowResult result = new PickRowResult(rowSize); + + TruthValue[] leafValues = new TruthValue[sargLeaves.size()]; + for (int row = 0; row < rowSize; ++row) { + for (int pred = 0; pred < leafValues.length; ++pred) { + leafValues[pred] = rowLevelResult[pred][row]; + } + if (sarg.evaluate(leafValues).isNeeded()) { + result.add(row); + } + } + return result; + } + + public static class PickRowResult { + public int[] selected; + public int size; + + PickRowResult(int size) { + selected = new int[size]; + this.size = 0; + } + + void add(int row) { + selected[size++] = row; + } + + public boolean isEmpty() { + return size == 0; + } + + public boolean isAllRowsSelected() { + return size == selected.length; + } + + public boolean isPartRowsSelected() { + return size > 0 && size != selected.length; + } + } } /** @@ -1075,6 +1229,19 @@ public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOExcept } result.size = batchSize; + + SargApplier.PickRowResult pickedResult = sargApp.pickRows(result.cols, result.size); + if (pickedResult.isPartRowsSelected()) { + result.selected = pickedResult.selected; + result.size = pickedResult.size; + result.selectedInUse = true; + } else if (pickedResult.isAllRowsSelected()) { + // do nothing + } else { + assert pickedResult.isEmpty(); + result.size = 0; + } + advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true); return result; } catch (IOException e) { diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java index 99744cd..3877a62 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java @@ -324,9 +324,15 @@ public void setElement(int outElementNum, int inputElementNum, ColumnVector inpu } if (inputVector.noNulls || !inputVector.isNull[inputElementNum]) { isNull[outElementNum] = false; - BytesColumnVector in = (BytesColumnVector) inputVector; - setVal(outElementNum, in.vector[inputElementNum], - in.start[inputElementNum], in.length[inputElementNum]); + if (this != inputVector) { + BytesColumnVector in = (BytesColumnVector) inputVector; + setVal(outElementNum, in.vector[inputElementNum], + in.start[inputElementNum], in.length[inputElementNum]); + } else { + vector[outElementNum] = vector[inputElementNum]; + start[outElementNum] = start[inputElementNum]; + length[outElementNum] = length[inputElementNum]; + } } else { isNull[outElementNum] = true; noNulls = false; @@ -375,4 +381,8 @@ public void ensureSize(int size, boolean preserveData) { } } } + + protected Object getValueForSarg(int elementNum) { + return new String(vector[elementNum], start[elementNum], length[elementNum]); + } } diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java index c069a5f..c77cd13 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java @@ -18,6 +18,11 @@ package org.apache.hadoop.hive.ql.exec.vector; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Arrays; /** @@ -31,6 +36,7 @@ * structure that is used in the inner loop of query execution. */ public abstract class ColumnVector { + protected static final Logger LOG = LoggerFactory.getLogger(ColumnVector.class); /* * The current kinds of column vectors. @@ -214,4 +220,71 @@ public void ensureSize(int size, boolean presesrveData) { */ public abstract void stringifyValue(StringBuilder buffer, int row); + + /** + * callback to apply predicate to the row value + */ + public interface SargPredicateEvaluator { + TruthValue eval(PredicateLeaf predicate, Object row); + } + + /** + * get value only for SearchArgument + * + * @param elementNum + * @return the row value + */ + protected abstract Object getValueForSarg(int elementNum); + + /** + * Apply sarg predicate to data that is holed by this CV + * + * @param predicate sarg predicate + * @param evaluator callback functor of predicate + * @param rowSize row size + * @return sarged result + */ + public TruthValue[] applySargPredicate(PredicateLeaf predicate, + SargPredicateEvaluator evaluator, + int rowSize) { + TruthValue[] result = new TruthValue[rowSize]; + + if (isRepeating) { + TruthValue[] tv = new TruthValue[1]; + if (noNulls || !isNull[0]) { + tv[0] = evaluator.eval(predicate, getValueForSarg(0)); + } else { + if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) { + tv[0] = TruthValue.YES; + } else { + tv[0] = TruthValue.NO_NULL; + } + } + for (int i = 0; i < rowSize; ++i) { + result[i] = tv[0]; + } + } else { + if (noNulls) { + for (int row = 0; row < rowSize; ++row) { + result[row] = evaluator.eval(predicate, getValueForSarg(row)); + } + } else { + for (int row = 0; row < rowSize; ++row) { + TruthValue tv; + if (!isNull[row]) { + tv = evaluator.eval(predicate, getValueForSarg(row)); + } else { + if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) { + tv = TruthValue.YES; + } else { + tv = TruthValue.NO_NULL; + } + } + result[row] = tv; + } + } + } + return result; + } + } diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java index 1523ff6..c81be47 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java @@ -153,4 +153,8 @@ public void ensureSize(int size, boolean preserveData) { } } } + + public Object getValueForSarg(int elementNum) { + return vector[elementNum]; + } } diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java index 41dc3e1..b1c27ce 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/DoubleColumnVector.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.exec.vector; -import java.io.IOException; import java.util.Arrays; /** @@ -175,4 +174,8 @@ public void ensureSize(int size, boolean preserveData) { } } } + + public Object getValueForSarg(int elementNum) { + return new Double(elementNum); + } } diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/IntervalDayTimeColumnVector.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/IntervalDayTimeColumnVector.java index 39ccea8..38c2990 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/IntervalDayTimeColumnVector.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/IntervalDayTimeColumnVector.java @@ -345,4 +345,7 @@ public void stringifyValue(StringBuilder buffer, int row) { buffer.append("null"); } } + public Object getValueForSarg(int elementNum) { + throw new UnsupportedOperationException("not supported data type"); + } } \ No newline at end of file diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ListColumnVector.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ListColumnVector.java index 66240dd..96f107a 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ListColumnVector.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/ListColumnVector.java @@ -116,4 +116,7 @@ public void unFlatten() { } } + public Object getValueForSarg(int elementNum) { + throw new UnsupportedOperationException("not supported data type"); + } } diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java index 0afe5db..cf220e1 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/LongColumnVector.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.exec.vector; -import java.io.IOException; import java.util.Arrays; /** @@ -221,4 +220,8 @@ public void ensureSize(int size, boolean preserveData) { } } } + + public Object getValueForSarg(int elementNum) { + return new Long(vector[elementNum]); + } } diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MapColumnVector.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MapColumnVector.java index e8421e3..c8c5efe 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MapColumnVector.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/MapColumnVector.java @@ -128,4 +128,8 @@ public void unFlatten() { values.unFlatten(); } } + + public Object getValueForSarg(int elementNum) { + throw new UnsupportedOperationException("not supported data type"); + } } diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java index cf07bca..cb9f1dd 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/StructColumnVector.java @@ -129,4 +129,8 @@ public void setRepeating(boolean isRepeating) { fields[i].setRepeating(isRepeating); } } + + public Object getValueForSarg(int elementNum) { + throw new UnsupportedOperationException("not supported data type"); + } } diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java index c0dd5ed..17ac325 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/TimestampColumnVector.java @@ -392,4 +392,9 @@ public void stringifyValue(StringBuilder buffer, int row) { buffer.append("null"); } } -} \ No newline at end of file + + public Object getValueForSarg(int elementNum) { + return asScratchTimestamp(elementNum); + } +} + diff --git storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java index 298d588..fcfd429 100644 --- storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java +++ storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/UnionColumnVector.java @@ -18,8 +18,6 @@ package org.apache.hadoop.hive.ql.exec.vector; -import java.util.Arrays; - /** * The representation of a vectorized column of struct objects. * @@ -139,4 +137,8 @@ public void setRepeating(boolean isRepeating) { fields[i].setRepeating(isRepeating); } } + + public Object getValueForSarg(int elementNum) { + throw new UnsupportedOperationException("not supported data type"); + } }