diff --git orc/src/java/org/apache/orc/StripeInformation.java orc/src/java/org/apache/orc/StripeInformation.java index 38f7eba..2d7ac52 100644 --- orc/src/java/org/apache/orc/StripeInformation.java +++ orc/src/java/org/apache/orc/StripeInformation.java @@ -56,4 +56,10 @@ * @return a count of the number of rows */ long getNumberOfRows(); + + /** + * get the heap size used by this object + * @return + */ + int getObjectSize(); } diff --git orc/src/java/org/apache/orc/StripeStatistics.java orc/src/java/org/apache/orc/StripeStatistics.java index 8fc91cb..7244fc5 100644 --- orc/src/java/org/apache/orc/StripeStatistics.java +++ orc/src/java/org/apache/orc/StripeStatistics.java @@ -25,6 +25,8 @@ public class StripeStatistics { private final List cs; + private int objectSize =- 1; + public StripeStatistics(List list) { this.cs = list; } @@ -41,4 +43,16 @@ public StripeStatistics(List list) { } return result; } + + public synchronized int getObjectSize () { + if (objectSize != -1) { + return objectSize; + } + if (cs != null) { + for (OrcProto.ColumnStatistics ocs : cs) { + objectSize += ocs.getSerializedSize(); + } + } + return objectSize; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index eae281c..b3ae443 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.cache.Weigher; import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.IOConstants; @@ -1385,6 +1386,7 @@ private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context conte private final List fileStats; private final List types; private final OrcFile.WriterVersion writerVersion; + private int weight = -1; FileInfo(long modificationTime, long size, List stripeInfos, @@ -1401,6 +1403,31 @@ private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context conte this.fileStats = fileStats; this.writerVersion = writerVersion; } + + private synchronized int getWeight(){ + if (weight != -1) { + return weight; + } + + if (stripeInfos != null) { + for (StripeInformation si :stripeInfos ) { + weight += si.getObjectSize(); + } + } + + if (stripeStats != null) { + for (StripeStatistics ss : stripeStats) { + weight += ss.getObjectSize(); + } + } + + if (fileStats != null) { + for (OrcProto.ColumnStatistics cs : fileStats) { + weight += cs.getSerializedSize(); + } + } + return weight; + } } @SuppressWarnings("unchecked") @@ -1778,10 +1805,19 @@ void put(Long fileId, FileStatus file, FileMetaInfo fileMetaInfo, Reader orcRead private final Cache cache; public LocalCache(int numThreads, int cacheStripeDetailsSize) { + // cache should never take up more than 50% of heap memory + long maxCacheHeapSize = Runtime.getRuntime().maxMemory() / 2; + LOG.info("Maximum Weight for orc footerCache : " + maxCacheHeapSize); cache = CacheBuilder.newBuilder() .concurrencyLevel(numThreads) .initialCapacity(cacheStripeDetailsSize) - .maximumSize(cacheStripeDetailsSize) + .maximumWeight(maxCacheHeapSize) + .weigher(new Weigher() { + @Override + public int weigh(Path path, FileInfo fileInfo) { + return fileInfo.getWeight(); + } + }) .softValues() .build(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java index 30c2fad..1f44eae 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java @@ -95,6 +95,7 @@ public static class StripeInformationImpl implements StripeInformation { private final OrcProto.StripeInformation stripe; + private int objectSize = -1; public StripeInformationImpl(OrcProto.StripeInformation stripe) { this.stripe = stripe; @@ -131,6 +132,15 @@ public long getNumberOfRows() { } @Override + public synchronized int getObjectSize() { + if (objectSize != -1) { + return objectSize; + } + objectSize = stripe.getSerializedSize(); + return objectSize; + } + + @Override public String toString() { return "offset: " + getOffset() + " data: " + getDataLength() + " rows: " + getNumberOfRows() + " tail: " + getFooterLength() +