From de5d1381005b159cc864ce602bf5c5e687174f4b Mon Sep 17 00:00:00 2001
From: xiefan46 <958034172@qq.com>
Date: Thu, 9 Mar 2017 18:25:11 +0800
Subject: [PATCH] KYLIN-2493 Auto enlarge buffer in FactDistinctColumnsMapper

---
 .../engine/mr/steps/FactDistinctColumnsMapper.java | 26 +++++++++++++++++++---
 1 file changed, 23 insertions(+), 3 deletions(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index d9c1309..158bd0d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -48,7 +48,9 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
     public static enum RawDataCounter {
         BYTES
-    };
+    }
+
+    ;
 
     protected boolean collectStatistics = false;
     protected CuboidScheduler cuboidScheduler = null;
@@ -153,8 +155,13 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
             }
 
             tmpbuf.clear();
+            byte[] valueBytes = Bytes.toBytes(fieldValue);
+            int size = valueBytes.length + 1;
+            if (size >= tmpbuf.capacity()) {
+                tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
+            }
             tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
-            tmpbuf.put(Bytes.toBytes(fieldValue));
+            tmpbuf.put(valueBytes);
             outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
             sortableKey.setText(outputKey);
             //judge type
@@ -176,8 +183,13 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
                 String fieldValue = row[partitionColumnIndex];
                 if (fieldValue != null) {
                     tmpbuf.clear();
+                    byte[] valueBytes = Bytes.toBytes(fieldValue);
+                    int size = valueBytes.length + 1;
+                    if (size >= tmpbuf.capacity()) {
+                        tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
+                    }
                     tmpbuf.put(MARK_FOR_PARTITION_COL);
-                    tmpbuf.put(Bytes.toBytes(fieldValue));
+                    tmpbuf.put(valueBytes);
                     outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
                     sortableKey.setText(outputKey);
                     sortableKey.setTypeId((byte) 0);
@@ -243,4 +255,12 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
             }
         }
     }
+
+    private int countNewSize(int oldSize, int dataSize) {
+        int newSize = oldSize * 2;
+        while (newSize < dataSize) {
+            newSize = newSize * 2;
+        }
+        return newSize;
+    }
 }
-- 
2.7.4

