diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 8a45b9c..501408e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3878,7 +3878,7 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "MR LineRecordRedader into LLAP cache, if this feature is enabled. Safety flag."), LLAP_ORC_ENABLE_TIME_COUNTERS("hive.llap.io.orc.time.counters", true, "Whether to enable time counters for LLAP IO layer (time spent in HDFS, etc.)"), - LLAP_IO_VRB_QUEUE_LIMIT_BASE("hive.llap.io.vrb.queue.limit.base", 10000, + LLAP_IO_VRB_QUEUE_LIMIT_BASE("hive.llap.io.vrb.queue.limit.base", 50000, "The default queue size for VRBs produced by a LLAP IO thread when the processing is\n" + "slower than the IO. The actual queue size is set per fragment, and is adjusted down\n" + "from the base, depending on the schema."), diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index cb57a11..0000757 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.NullWritable; @@ -163,7 +164,7 @@ private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split, int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_BASE, job, daemonConf); int queueLimitMin = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, job, daemonConf); - int limit = determineQueueLimit(queueLimitBase, queueLimitMin, rbCtx.getRowColumnTypeInfos()); + int limit = determineQueueLimit(queueLimitBase, queueLimitMin, rbCtx.getRowColumnTypeInfos(), job); LOG.info("Queue limit for LlapRecordReader is " + limit); this.queue = new LinkedBlockingQueue<>(limit); @@ -199,14 +200,14 @@ private static int getQueueVar(ConfVars var, JobConf jobConf, Configuration daem private static final int COL_WEIGHT_COMPLEX = 16, COL_WEIGHT_HIVEDECIMAL = 4, COL_WEIGHT_STRING = 8; private static int determineQueueLimit( - int queueLimitBase, int queueLimitMin, TypeInfo[] typeInfos) { + int queueLimitBase, int queueLimitMin, TypeInfo[] typeInfos, final JobConf job) { // If the values are equal, the queue limit is fixed. if (queueLimitBase == queueLimitMin) return queueLimitBase; // If there are no columns (projection only join?) just assume no weight. if (typeInfos == null || typeInfos.length == 0) return queueLimitBase; double totalWeight = 0; for (TypeInfo ti : typeInfos) { - int colWeight = 1; + int colWeight; if (ti.getCategory() != Category.PRIMITIVE) { colWeight = COL_WEIGHT_COMPLEX; } else { @@ -217,8 +218,24 @@ private static int determineQueueLimit( case VARCHAR: case STRING: colWeight = COL_WEIGHT_STRING; + break; case DECIMAL: - colWeight = COL_WEIGHT_HIVEDECIMAL; + boolean useDecimal64 = false; + if (ti instanceof DecimalTypeInfo) { + DecimalTypeInfo dti = (DecimalTypeInfo) ti; + if (dti.getPrecision() <= TypeDescription.MAX_DECIMAL64_PRECISION && + HiveConf.getVar(job, ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED) + .equalsIgnoreCase("decimal_64")) { + useDecimal64 = true; + } + } + // decimal_64 column vectors gets the same weight as long column vectors + if (useDecimal64) { + colWeight = 1; + } else { + colWeight = COL_WEIGHT_HIVEDECIMAL; + } + break; default: colWeight = 1; }