diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java index d8f59d1..51eb34e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cache/SimpleAllocator.java @@ -32,9 +32,10 @@ private final boolean isDirect; private static Field cleanerField; static { - ByteBuffer tmp = ByteBuffer.allocateDirect(1); try { - cleanerField = tmp.getClass().getDeclaredField("cleaner"); + // TODO: To make it work for JDK9 use CleanerUtil from https://issues.apache.org/jira/browse/HADOOP-12760 + final Class dbClazz = Class.forName("java.nio.DirectByteBuffer"); + cleanerField = dbClazz.getDeclaredField("cleaner"); cleanerField.setAccessible(true); } catch (Throwable t) { LlapIoImpl.LOG.warn("Cannot initialize DirectByteBuffer cleaner", t); diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 6554fa2..34c6824 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -145,6 +145,7 @@ public void resetBeforeOffer(OrcEncodedColumnBatch t) { private final QueryFragmentCounters counters; private final UserGroupInformation ugi; private final SchemaEvolution evolution; + private boolean useZeroCopy; // Read state. private int stripeIxFrom; @@ -314,7 +315,7 @@ protected Void performDataRead() throws IOException { ensureOrcReader(); // Reader creating updates HDFS counters, don't do it here. DataWrapperForOrc dw = new DataWrapperForOrc(); - stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY); + stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY, useZeroCopy); stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled()); } catch (Throwable t) { consumer.setError(t); @@ -656,7 +657,7 @@ private void ensureMetadataReader() throws IOException { ensureOrcReader(); if (metadataReader != null) return; long startTime = counters.startTimeCounter(); - boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf); + useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf); metadataReader = RecordReaderUtils.createDefaultDataReader( DataReaderProperties.builder() .withBufferSize(orcReader.getCompressionSize()) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 0ac3ec5..3167a3b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.io.orc.encoded; import java.io.IOException; +import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -43,11 +44,13 @@ import org.apache.orc.impl.StreamName; import org.apache.orc.StripeInformation; import org.apache.orc.impl.BufferChunk; -import org.apache.hadoop.hive.llap.DebugUtils; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory; import org.apache.orc.OrcProto; +import sun.misc.Cleaner; + + /** * Encoded reader implementation. * @@ -80,6 +83,17 @@ */ class EncodedReaderImpl implements EncodedReader { public static final Logger LOG = LoggerFactory.getLogger(EncodedReaderImpl.class); + private static Field cleanerField; + static { + try { + // TODO: To make it work for JDK9 use CleanerUtil from https://issues.apache.org/jira/browse/HADOOP-12760 + final Class dbClazz = Class.forName("java.nio.DirectByteBuffer"); + cleanerField = dbClazz.getDeclaredField("cleaner"); + cleanerField.setAccessible(true); + } catch (Throwable t) { + cleanerField = null; + } + } private static final Object POOLS_CREATION_LOCK = new Object(); private static Pools POOLS; private static class Pools { @@ -105,10 +119,11 @@ public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end private final long rowIndexStride; private final DataCache cacheWrapper; private boolean isTracingEnabled; + private final boolean isZeroCopy; public EncodedReaderImpl(Object fileKey, List types, CompressionCodec codec, - int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader, - PoolFactory pf) throws IOException { + int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader, + PoolFactory pf, final boolean useZeroCopy) throws IOException { this.fileKey = fileKey; this.codec = codec; this.types = types; @@ -116,6 +131,7 @@ public EncodedReaderImpl(Object fileKey, List types, CompressionC this.rowIndexStride = strideRate; this.cacheWrapper = cacheWrapper; this.dataReader = dataReader; + this.isZeroCopy = useZeroCopy; if (POOLS != null) return; if (pf == null) { pf = new NoopPoolFactory(); @@ -683,6 +699,18 @@ public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, lon copyUncompressedChunk(chunk.originalData, dest); } + if (!isZeroCopy && chunk.originalData.isDirect()) { + if (cleanerField != null) { + try { + ((Cleaner) cleanerField.get(chunk.originalData)).clean(); + } catch (Exception e) { + // leave it for GC to clean up + LOG.warn("Unable to clean direct buffers using Cleaner."); + cleanerField = null; + } + } + } + chunk.originalData = null; if (isTracingEnabled) { LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java index 31b0609..98c11f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java @@ -42,10 +42,11 @@ * @param dataCache Data cache to use for cache lookups. * @param dataReader Data reader to read data not found in cache (from disk, HDFS, and such). * @param pf Pool factory to create object pools. + * @param useZeroCopy Is zero copy reader? * @return The reader. */ EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader, - PoolFactory pf) throws IOException; + PoolFactory pf, final boolean useZeroCopy) throws IOException; /** The factory that can create (or return) the pools used by encoded reader. */ public interface PoolFactory { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java index 4856fb3..5fb42eb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/ReaderImpl.java @@ -33,9 +33,9 @@ public ReaderImpl(Path path, ReaderOptions options) throws IOException { } @Override - public EncodedReader encodedReader( - Object fileKey, DataCache dataCache, DataReader dataReader, PoolFactory pf) throws IOException { + public EncodedReader encodedReader(Object fileKey, DataCache dataCache, DataReader dataReader, PoolFactory pf, + final boolean useZeroCopy) throws IOException { return new EncodedReaderImpl(fileKey, types, - codec, bufferSize, rowIndexStride, dataCache, dataReader, pf); + codec, bufferSize, rowIndexStride, dataCache, dataReader, pf, useZeroCopy); } }