From 682fe4fc7497edaf1196205355c43dfccdebfa19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=86=AF=E5=AE=87?= Date: Mon, 29 Jan 2018 14:37:39 +0800 Subject: [PATCH] KYLIN-2929, speed up dump performance, write dump file to disk in lazy way --- .../apache/kylin/gridtable/GTAggregateScanner.java | 70 ++++++++++++++++++---- 1 file changed, 57 insertions(+), 13 deletions(-) diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java index 07bec88..ad4a645 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java @@ -18,6 +18,8 @@ package org.apache.kylin.gridtable; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -25,6 +27,8 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Comparator; @@ -55,6 +59,7 @@ import com.google.common.collect.Maps; public class GTAggregateScanner implements IGTScanner { private static final Logger logger = LoggerFactory.getLogger(GTAggregateScanner.class); + private static final int MAX_BUFFER_SIZE = 64 * 1024 * 1024; final GTInfo info; final GTScanRequest request; @@ -65,7 +70,7 @@ public class GTAggregateScanner implements IGTScanner { final IGTScanner inputScanner; final BufferedMeasureCodec measureCodec; final AggregationCache aggrCache; - final long spillThreshold; // 0 means no memory control && no spill + long spillThreshold; // 0 means no memory control && no spill final int storagePushDownLimit;//default to be Int.MAX final boolean spillEnabled; @@ -178,6 +183,7 @@ public class GTAggregateScanner implements IGTScanner { final int keyLength; final boolean[] compareMask; boolean compareAll = true; + long sumSpilledSize = 0; final Comparator bytesComparator = new Comparator() { @Override @@ -299,6 +305,18 @@ public class GTAggregateScanner implements IGTScanner { Dump dump = new Dump(aggBufMap, estMemSize); dump.flush(); dumps.add(dump); + sumSpilledSize += dump.size(); + // when spilled data is too much, we can modify it by other strategy. + // this means, all spilled data is bigger than half of original spillThreshold. + if(sumSpilledSize > spillThreshold) { + for(Dump current : dumps) { + current.spill(); + } + spillThreshold += sumSpilledSize; + sumSpilledSize = 0; + } else { + spillThreshold -= dump.size(); + } } catch (Exception e) { throw new RuntimeException("AggregationCache failed to spill", e); } @@ -417,7 +435,7 @@ public class GTAggregateScanner implements IGTScanner { final File dumpedFile; SortedMap buffMap; final long estMemSize; - + byte[] spillBuffer; DataInputStream dis; public Dump(SortedMap buffMap, long estMemSize) throws IOException { @@ -433,7 +451,11 @@ public class GTAggregateScanner implements IGTScanner { throw new RuntimeException("Dumped file cannot be found at: " + (dumpedFile == null ? "" : dumpedFile.getAbsolutePath())); } - dis = new DataInputStream(new FileInputStream(dumpedFile)); + if(spillBuffer == null) { + dis = new DataInputStream(new FileInputStream(dumpedFile)); + } else { + dis = new DataInputStream(new ByteArrayInputStream(spillBuffer)); + } final int count = dis.readInt(); return new Iterator>() { int cursorIdx = 0; @@ -469,39 +491,61 @@ public class GTAggregateScanner implements IGTScanner { } } + public void spill() throws IOException { + if(spillBuffer == null) return; + OutputStream ops = new FileOutputStream(dumpedFile); + InputStream ips = new ByteArrayInputStream(spillBuffer); + IOUtils.copy(ips, ops); + spillBuffer = null; + IOUtils.closeQuietly(ips); + IOUtils.closeQuietly(ops); + + logger.info("Spill buffer to disk, location: {}, size = {}.", dumpedFile.getAbsolutePath(), + dumpedFile.length()); + } + + public int size() { + return spillBuffer == null ? 0 : spillBuffer.length; + } + public void flush() throws IOException { logger.info("AggregationCache(size={} est_mem_size={} threshold={}) will spill to {}", buffMap.size(), estMemSize, spillThreshold, dumpedFile.getAbsolutePath()); - + ByteArrayOutputStream baos = new ByteArrayOutputStream(MAX_BUFFER_SIZE); if (buffMap != null) { - DataOutputStream dos = null; + DataOutputStream bos = new DataOutputStream(baos); Object[] aggrResult = null; try { - dos = new DataOutputStream(new FileOutputStream(dumpedFile)); - dos.writeInt(buffMap.size()); + bos.writeInt(buffMap.size()); + for (Entry entry : buffMap.entrySet()) { MeasureAggregators aggs = new MeasureAggregators(entry.getValue()); aggrResult = new Object[metrics.trueBitCount()]; aggs.collectStates(aggrResult); ByteBuffer metricsBuf = measureCodec.encode(aggrResult); - dos.writeInt(entry.getKey().length); - dos.write(entry.getKey()); - dos.writeInt(metricsBuf.position()); - dos.write(metricsBuf.array(), 0, metricsBuf.position()); + + bos.writeInt(entry.getKey().length); + bos.write(entry.getKey()); + bos.writeInt(metricsBuf.position()); + bos.write(metricsBuf.array(), 0, metricsBuf.position()); } } finally { buffMap = null; - IOUtils.closeQuietly(dos); + IOUtils.closeQuietly(bos); } } + spillBuffer = baos.toByteArray(); + IOUtils.closeQuietly(baos); + logger.info("Accurately spill data size = {}", spillBuffer.length); } public void terminate() throws IOException { buffMap = null; if (dis != null) - dis.close(); + IOUtils.closeQuietly(dis); if (dumpedFile != null && dumpedFile.exists()) dumpedFile.delete(); + spillBuffer = null; } } -- 1.9.4.msysgit.2