diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7a81612..3300fdc 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3152,6 +3152,9 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal LLAP_IO_USE_FILEID_PATH("hive.llap.io.use.fileid.path", true, "Whether LLAP should use fileId (inode)-based path to ensure better consistency for the\n" + "cases of file overwrites. This is supported on HDFS."), + LLAP_IO_PENDING_DATA_BUFFER_CAPACITY("hive.llap.io.pending.data.buffer.capacity", 10000, + "When LLAP IO is enabled, defines the capacity of the buffer that is used by LLAP record reader. In case fast\n" + + "IO and slow processing, this bounds the amount of buffered data that is pending for processing."), // Restricted to text for now as this is a new feature; only text files can be sliced. LLAP_IO_ENCODE_ENABLED("hive.llap.io.encode.enabled", true, "Whether LLAP should try to re-encode and cache data for non-ORC formats. This is used\n" + diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index 5f010be..873e94b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -21,9 +21,9 @@ import java.util.ArrayList; import java.io.IOException; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -72,19 +72,17 @@ private final FileSplit split; private List columnIds; - private final SearchArgument sarg; - private final String[] columnNames; private final VectorizedRowBatchCtx rbCtx; private final Object[] partitionValues; - private final LinkedList pendingData = new LinkedList(); + private final LinkedBlockingDeque pendingData; private ColumnVectorBatch lastCvb = null; private boolean isFirst = true; private Throwable pendingError = null; /** Vector that is currently being processed by our user. */ private boolean isDone = false; - private final boolean isClosed = false; + private boolean isClosed = false; private final ConsumerFeedback feedback; private final QueryFragmentCounters counters; private long firstReturnTime; @@ -121,8 +119,10 @@ private LlapRecordReader(MapWork mapWork, JobConf job, FileSplit split, this.executor = executor; this.jobConf = job; this.split = split; - this.sarg = ConvertAstToSearchArg.createFromConf(job); - this.columnNames = ColumnProjectionUtils.getReadColumnNames(job); + final SearchArgument sarg = ConvertAstToSearchArg.createFromConf(job); + final String[] columnNames = ColumnProjectionUtils.getReadColumnNames(job); + int capacity = HiveConf.getIntVar(jobConf, ConfVars.LLAP_IO_PENDING_DATA_BUFFER_CAPACITY); + this.pendingData = new LinkedBlockingDeque<>(capacity); final String fragmentId = LlapTezUtils.getFragmentId(job); final String dagId = LlapTezUtils.getDagId(job); final String queryId = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID); @@ -362,7 +362,7 @@ ColumnVectorBatch nextCvb() throws InterruptedException, IOException { LlapIoImpl.LOG.trace("next is unblocked"); } rethrowErrorIfAny(); - lastCvb = pendingData.poll(); + lastCvb = pendingData.take(); } if (LlapIoImpl.LOG.isTraceEnabled() && lastCvb != null) { LlapIoImpl.LOG.trace("Processing will receive vector {}", lastCvb); @@ -395,6 +395,7 @@ public void close() throws IOException { LlapIoImpl.LOG.trace("close called; closed {}, done {}, err {}, pending {}", isClosed, isDone, pendingError, pendingData.size()); } + isClosed = true; LlapIoImpl.LOG.info("Llap counters: {}" ,counters); // This is where counters are logged! feedback.stop(); rethrowErrorIfAny(); @@ -427,12 +428,16 @@ public void consumeData(ColumnVectorBatch data) { LlapIoImpl.LOG.trace("consume called; closed {}, done {}, err {}, pending {}", isClosed, isDone, pendingError, pendingData.size()); } - synchronized (pendingData) { - if (isClosed) { - return; + try { + synchronized (pendingData) { + if (isClosed) { + return; + } + pendingData.put(data); + pendingData.notifyAll(); } - pendingData.add(data); - pendingData.notifyAll(); + } catch (InterruptedException e) { + setError(e); } }