From c356a569177533b63733a55a01da6c642c84ff3a Mon Sep 17 00:00:00 2001
From: wangxiaoyu8 <wangxiaoyu1@jd.com>
Date: Mon, 9 Nov 2015 11:00:19 +0800
Subject: [PATCH] KYLIN-1122 Kylin support detail data query from fact table

---
 .../cube/model/validation/rule/FunctionRule.java   |  2 +
 .../gridtable/AggregationCacheMemSizeTest.java     | 10 +--
 .../kylin/gridtable/SimpleInvertedIndexTest.java   |  3 +-
 .../metadata/measure/BigDecimalMaxAggregator.java  | 15 +++--
 .../metadata/measure/BigDecimalMinAggregator.java  | 15 +++--
 .../kylin/metadata/measure/BigDecimalMutable.java  | 69 ++++++++++++++++++++
 .../metadata/measure/BigDecimalSumAggregator.java  | 13 ++--
 .../kylin/metadata/measure/MeasureAggregator.java  | 14 ++++-
 .../kylin/metadata/measure/StringMutable.java      | 68 ++++++++++++++++++++
 .../kylin/metadata/measure/ValueAggregator.java    | 73 ++++++++++++++++++++++
 .../measure/serializer/BigDecimalSerializer.java   | 40 ++++++++----
 .../measure/serializer/StringSerializer.java       | 31 ++++++---
 .../apache/kylin/metadata/model/FunctionDesc.java  | 11 +++-
 .../serializer/BigDecimalSerializerTest.java       | 13 ++--
 .../java/org/apache/kylin/storage/tuple/Tuple.java | 11 ++++
 .../kylin/query/enumerator/OLAPEnumerator.java     | 22 ++++---
 .../kylin/storage/hbase/steps/RowValueDecoder.java |  9 +--
 webapp/app/js/controllers/cubeMeasures.js          |  1 +
 webapp/app/js/model/cubeConfig.js                  |  2 +-
 19 files changed, 349 insertions(+), 73 deletions(-)
 create mode 100644 core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMutable.java
 create mode 100644 core-metadata/src/main/java/org/apache/kylin/metadata/measure/StringMutable.java
 create mode 100644 core-metadata/src/main/java/org/apache/kylin/metadata/measure/ValueAggregator.java

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
index 80bd2f7..5e4f199 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
@@ -122,6 +122,8 @@ public class FunctionRule implements IValidatorRule<CubeDesc> {
             if (rtype.isNumberFamily() == false) {
                 context.addResult(ResultLevel.ERROR, "Return type for function " + func + " must be one of " + DataType.NUMBER_FAMILY);
             }
+        } else if (funcDesc.isValue()) {
+
         } else {
             if (StringUtils.equalsIgnoreCase(KylinConfig.getInstanceFromEnv().getProperty(KEY_IGNORE_UNKNOWN_FUNC, "false"), "false")) {
                 context.addResult(ResultLevel.ERROR, "Unrecognized function: [" + func + "]");
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
index fa7d611..3aedd0b 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
@@ -25,13 +25,7 @@ import java.util.TreeMap;
 
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.metadata.measure.BigDecimalSumAggregator;
-import org.apache.kylin.metadata.measure.DoubleMutable;
-import org.apache.kylin.metadata.measure.DoubleSumAggregator;
-import org.apache.kylin.metadata.measure.HLLCAggregator;
-import org.apache.kylin.metadata.measure.LongMutable;
-import org.apache.kylin.metadata.measure.LongSumAggregator;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.metadata.measure.*;
 import org.junit.Test;
 
 public class AggregationCacheMemSizeTest {
@@ -69,7 +63,7 @@ public class AggregationCacheMemSizeTest {
 
     private BigDecimalSumAggregator newBigDecimalAggr() {
         BigDecimalSumAggregator aggr = new BigDecimalSumAggregator();
-        aggr.aggregate(new BigDecimal("12345678901234567890.123456789"));
+        aggr.aggregate(new BigDecimalMutable(new BigDecimal("12345678901234567890.123456789")));
         return aggr;
     }
 
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
index 41d75c9..6dc57f7 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
@@ -31,6 +31,7 @@ import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
 import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.StringMutable;
 import org.apache.kylin.metadata.measure.serializer.StringSerializer;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -167,7 +168,7 @@ public class SimpleInvertedIndexTest {
         byte[] space = new byte[10];
         ByteBuffer buf = ByteBuffer.wrap(space);
         StringSerializer stringSerializer = new StringSerializer(DataType.getInstance("string"));
-        stringSerializer.serialize("" + id, buf);
+        stringSerializer.serialize(new StringMutable("" + id), buf);
         ByteArray data = new ByteArray(buf.array(), buf.arrayOffset(), buf.position());
         return new ConstantTupleFilter(data);
     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java
index 91713a4..d090078 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java
@@ -18,14 +18,13 @@
 
 package org.apache.kylin.metadata.measure;
 
-import java.math.BigDecimal;
 
 /**
  */
 @SuppressWarnings("serial")
-public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimal> {
+public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimalMutable> {
 
-    BigDecimal max = null;
+    BigDecimalMutable max = null;
 
     @Override
     public void reset() {
@@ -33,15 +32,15 @@ public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimal> {
     }
 
     @Override
-    public void aggregate(BigDecimal value) {
+    public void aggregate(BigDecimalMutable value) {
         if (max == null)
-            max = value;
-        else if (max.compareTo(value) < 0)
-            max = value;
+            max = new BigDecimalMutable(value.get());
+        else if (max.get().compareTo(value.get()) < 0)
+            max.set(value.get());
     }
 
     @Override
-    public BigDecimal getState() {
+    public BigDecimalMutable getState() {
         return max;
     }
 
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java
index 6bbb6a9..5a99e27 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java
@@ -18,14 +18,13 @@
 
 package org.apache.kylin.metadata.measure;
 
-import java.math.BigDecimal;
 
 /**
  */
 @SuppressWarnings("serial")
-public class BigDecimalMinAggregator extends MeasureAggregator<BigDecimal> {
+public class BigDecimalMinAggregator extends MeasureAggregator<BigDecimalMutable> {
 
-    BigDecimal max = null;
+    BigDecimalMutable max = null;
 
     @Override
     public void reset() {
@@ -33,15 +32,15 @@ public class BigDecimalMinAggregator extends MeasureAggregator<BigDecimal> {
     }
 
     @Override
-    public void aggregate(BigDecimal value) {
+    public void aggregate(BigDecimalMutable value) {
         if (max == null)
-            max = value;
-        else if (max.compareTo(value) > 0)
-            max = value;
+            max = new BigDecimalMutable(value.get());
+        else if (max.get().compareTo(value.get()) > 0)
+            max.set(value.get());
     }
 
     @Override
-    public BigDecimal getState() {
+    public BigDecimalMutable getState() {
         return max;
     }
 
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMutable.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMutable.java
new file mode 100644
index 0000000..9ed0cbb
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMutable.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+
+@SuppressWarnings("serial")
+public class BigDecimalMutable implements Comparable<BigDecimalMutable>, Serializable {
+
+    private BigDecimal v;
+
+    public BigDecimalMutable() {
+        this(new BigDecimal(0));
+    }
+
+    public BigDecimalMutable(BigDecimal v) {
+        set(v);
+    }
+
+    public BigDecimal get() {
+        return v;
+    }
+
+    public void set(BigDecimal v) {
+        this.v = v;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof BigDecimalMutable)) {
+            return false;
+        }
+        BigDecimalMutable other = (BigDecimalMutable) o;
+        return this.v.equals(other.v);
+    }
+
+    @Override
+    public int hashCode() {
+        return v.hashCode();
+    }
+
+    @Override
+    public int compareTo(BigDecimalMutable o) {
+        return this.v.compareTo(o.v);
+    }
+
+    @Override
+    public String toString() {
+        return v.toString();
+    }
+
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java
index f1c60a1..934fe85 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java
@@ -18,27 +18,26 @@
 
 package org.apache.kylin.metadata.measure;
 
-import java.math.BigDecimal;
 
 /**
  */
 @SuppressWarnings("serial")
-public class BigDecimalSumAggregator extends MeasureAggregator<BigDecimal> {
+public class BigDecimalSumAggregator extends MeasureAggregator<BigDecimalMutable> {
 
-    BigDecimal sum = new BigDecimal(0);
+    BigDecimalMutable sum = new BigDecimalMutable();
 
     @Override
     public void reset() {
-        sum = new BigDecimal(0);
+        sum = new BigDecimalMutable();
     }
 
     @Override
-    public void aggregate(BigDecimal value) {
-        sum = sum.add(value);
+    public void aggregate(BigDecimalMutable value) {
+        sum.set(sum.get().add(value.get()));
     }
 
     @Override
-    public BigDecimal getState() {
+    public BigDecimalMutable getState() {
         return sum;
     }
 
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
index 1b38aa5..5d93c5c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
@@ -58,6 +58,10 @@ abstract public class MeasureAggregator<V> implements Serializable {
                 return new DoubleMinAggregator();
         } else if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName)) {
             return new TopNAggregator();
+        } else if (FunctionDesc.FUNC_VALUE.equalsIgnoreCase(funcName)) {
+            ValueAggregator sva = new ValueAggregator();
+            sva.setReturnType(returnType);
+            return sva;
         }
         throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + returnType + "'");
     }
@@ -71,7 +75,15 @@ abstract public class MeasureAggregator<V> implements Serializable {
     }
 
     public static boolean isInteger(String type) {
-        return "long".equalsIgnoreCase(type) || "bigint".equalsIgnoreCase(type) || "int".equalsIgnoreCase(type) || "integer".equalsIgnoreCase(type);
+        return "long".equalsIgnoreCase(type) || "bigint".equalsIgnoreCase(type) || "int".equalsIgnoreCase(type) || "integer".equalsIgnoreCase(type) || "smallint".equalsIgnoreCase(type);
+    }
+
+    public static boolean isDate(String type) {
+        return "date".equalsIgnoreCase(type);
+    }
+
+    public static boolean isTimestamp(String type) {
+        return "timestamp".equalsIgnoreCase(type);
     }
 
     public static int guessBigDecimalMemBytes() {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/StringMutable.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/StringMutable.java
new file mode 100644
index 0000000..8acd29b
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/StringMutable.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class StringMutable implements Comparable<StringMutable>, Serializable {
+
+    private String v;
+
+    public StringMutable() {
+        this(new String());
+    }
+
+    public StringMutable(String v) {
+        set(v);
+    }
+
+    public String get() {
+        return v;
+    }
+
+    public void set(String v) {
+        this.v = v;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof StringMutable)) {
+            return false;
+        }
+        StringMutable other = (StringMutable) o;
+        return this.v.equals(other.v);
+    }
+
+    @Override
+    public int hashCode() {
+        return v.hashCode();
+    }
+
+    @Override
+    public int compareTo(StringMutable o) {
+        return this.v.compareTo(o.v);
+    }
+
+    @Override
+    public String toString() {
+        return v;
+    }
+
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/ValueAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/ValueAggregator.java
new file mode 100644
index 0000000..c1cb348
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/ValueAggregator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.metadata.measure;
+
+/**
+ * Created by wangxiaoyu on 15-11-5.
+ */
+public class ValueAggregator extends MeasureAggregator<Object>{
+    private String returnType;
+
+    public void setReturnType(String returnType) {
+        this.returnType = returnType;
+    }
+    Object value = null;
+
+    @Override
+    public void reset() {
+        this.value = null;
+    }
+
+    @Override
+    public void aggregate(Object value) {
+        if (value != null) {
+            if (isInteger(returnType) || isDate(returnType) || isTimestamp(returnType)) {
+                if(this.value == null){
+                    this.value = new LongMutable();
+                }
+                ((LongMutable) this.value).set(((LongMutable) value).get());
+            } else if (isDouble(returnType)) {
+                if(this.value == null){
+                    this.value = new DoubleMutable();
+                }
+                ((DoubleMutable) this.value).set(((DoubleMutable) value).get());
+            } else if (isBigDecimal(returnType)) {
+                if(this.value == null){
+                    this.value = new BigDecimalMutable();
+                }
+                ((BigDecimalMutable) this.value).set(((BigDecimalMutable) value).get());
+            } else {
+                if(this.value == null){
+                    this.value = new StringMutable();
+                }
+                ((StringMutable) this.value).set(((StringMutable) value).get());
+            }
+        }
+    }
+
+    @Override
+    public Object getState() {
+        return this.value;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return 4 // ref
+                + 256;
+    }
+}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializer.java
index d10c565..741dccb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializer.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.measure.BigDecimalMutable;
 import org.apache.kylin.metadata.model.DataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,7 +33,7 @@ import org.slf4j.LoggerFactory;
  * @author yangli9
  * 
  */
-public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
+public class BigDecimalSerializer extends DataTypeSerializer<BigDecimalMutable> {
 
     private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class);
 
@@ -41,39 +42,52 @@ public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
 
     int avoidVerbose = 0;
 
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<BigDecimalMutable> current = new ThreadLocal<BigDecimalMutable>();
+
     public BigDecimalSerializer(DataType type) {
         this.type = type;
         // see serialize(): 1 byte scale, 1 byte length, assume every 2 digits takes 1 byte
         this.maxLength = 1 + 1 + (type.getPrecision() + 1) / 2;
     }
 
+    private BigDecimalMutable current() {
+        BigDecimalMutable b = current.get();
+        if (b == null) {
+            b = new BigDecimalMutable();
+            current.set(b);
+        }
+        return b;
+    }
+
     @Override
-    public void serialize(BigDecimal value, ByteBuffer out) {
-        if (value.scale() > type.getScale()) {
+    public void serialize(BigDecimalMutable value, ByteBuffer out) {
+        if (value.get().scale() > type.getScale()) {
             if (avoidVerbose % 10000 == 0) {
                 logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++));
             }
-            value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN);
+            value.set((value.get().setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN)));
         }
-        byte[] bytes = value.unscaledValue().toByteArray();
+        byte[] bytes = value.get().unscaledValue().toByteArray();
         if (bytes.length + 2 > maxLength) {
             throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
         }
 
-        BytesUtil.writeVInt(value.scale(), out);
+        BytesUtil.writeVInt(value.get().scale(), out);
         BytesUtil.writeVInt(bytes.length, out);
         out.put(bytes);
     }
 
     @Override
-    public BigDecimal deserialize(ByteBuffer in) {
+    public BigDecimalMutable deserialize(ByteBuffer in) {
         int scale = BytesUtil.readVInt(in);
         int n = BytesUtil.readVInt(in);
 
         byte[] bytes = new byte[n];
         in.get(bytes);
-
-        return new BigDecimal(new BigInteger(bytes), scale);
+        BigDecimalMutable b = current();
+        b.set(new BigDecimal(new BigInteger(bytes), scale));
+        return b;
     }
 
     @Override
@@ -100,11 +114,13 @@ public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
     }
 
     @Override
-    public BigDecimal valueOf(byte[] value) {
+    public BigDecimalMutable valueOf(byte[] value) {
+        BigDecimalMutable b = current();
         if (value == null)
-            return new BigDecimal(0);
+            b.set(new BigDecimal(0));
         else
-            return new BigDecimal(Bytes.toString(value));
+            b.set(new BigDecimal(Bytes.toString(value)));
+        return b;
     }
 
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/StringSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/StringSerializer.java
index db27ca0..2867de6 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/StringSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/serializer/StringSerializer.java
@@ -4,32 +4,47 @@ import java.nio.ByteBuffer;
 
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.measure.StringMutable;
 import org.apache.kylin.metadata.model.DataType;
 
-public class StringSerializer extends DataTypeSerializer<String> {
+public class StringSerializer extends DataTypeSerializer<StringMutable> {
 
     final DataType type;
     final int maxLength;
 
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<StringMutable> current = new ThreadLocal<StringMutable>();
+
     public StringSerializer(DataType type) {
         this.type = type;
         // see serialize(): 2 byte length, rest is String.toBytes()
         this.maxLength = 2 + type.getPrecision();
     }
 
+    private StringMutable current() {
+        StringMutable s = current.get();
+        if (s == null) {
+            s = new StringMutable();
+            current.set(s);
+        }
+        return s;
+    }
+
     @Override
-    public void serialize(String value, ByteBuffer out) {
+    public void serialize(StringMutable value, ByteBuffer out) {
         int start = out.position();
 
-        BytesUtil.writeUTFString(value, out);
+        BytesUtil.writeUTFString(value.get(), out);
 
         if (out.position() - start > maxLength)
             throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
     }
 
     @Override
-    public String deserialize(ByteBuffer in) {
-        return BytesUtil.readUTFString(in);
+    public StringMutable deserialize(ByteBuffer in) {
+        StringMutable s = current();
+        s.set(BytesUtil.readUTFString(in));
+        return s;
     }
 
     @Override
@@ -48,8 +63,10 @@ public class StringSerializer extends DataTypeSerializer<String> {
     }
 
     @Override
-    public String valueOf(byte[] value) {
-        return Bytes.toString(value);
+    public StringMutable valueOf(byte[] value) {
+        StringMutable s = current();
+        s.set(Bytes.toString(value));
+        return s;
     }
 
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index d10f395..d020084 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -35,6 +35,7 @@ public class FunctionDesc {
     public static final String FUNC_COUNT = "COUNT";
     public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
     public static final String FUNC_TOP_N = "TOP_N";
+    public static final String FUNC_VALUE = "VALUE";
 
     public static final String PARAMTER_TYPE_CONSTANT = "constant";
     public static final String PARAMETER_TYPE_COLUMN = "column";
@@ -50,7 +51,7 @@ public class FunctionDesc {
     private boolean isDimensionAsMetric = false;
 
     public String getRewriteFieldName() {
-        if (isSum()) {
+        if (isSum() || isValue()) {
             return getParameter().getValue();
         } else if (isCount()) {
             return "COUNT__"; // ignores parameter, count(*), count(1), count(col) are all the same
@@ -60,7 +61,7 @@ public class FunctionDesc {
     }
 
     public boolean needRewrite() {
-        return !isSum() && !isDimensionAsMetric() && !isTopN();
+        return !isValue() && !isSum() && !isDimensionAsMetric() && !isTopN();
     }
 
     public ColumnDesc newFakeRewriteColumn(TableDesc sourceTable) {
@@ -97,6 +98,10 @@ public class FunctionDesc {
         return FUNC_TOP_N.equalsIgnoreCase(expression);
     }
 
+    public boolean isValue() {
+        return FUNC_VALUE.equalsIgnoreCase(expression);
+    }
+
     public boolean isHolisticCountDistinct() {
         if (isCountDistinct() && returnDataType != null && returnDataType.isBigInt()) {
             return true;
@@ -145,7 +150,7 @@ public class FunctionDesc {
     public DataType getSQLType() {
         if (isCountDistinct() || isTopN())
             return DataType.ANY;
-        else if (isSum() || isMax() || isMin())
+        else if (isSum() || isMax() || isMin() || isValue())
             return parameter.getColRefs().get(0).getType();
         else
             return returnDataType;
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializerTest.java
index 682bc24..be81bd3 100644
--- a/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializerTest.java
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/measure/serializer/BigDecimalSerializerTest.java
@@ -5,6 +5,7 @@ import static org.junit.Assert.assertEquals;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
+import org.apache.kylin.metadata.measure.BigDecimalMutable;
 import org.apache.kylin.metadata.model.DataType;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -22,29 +23,29 @@ public class BigDecimalSerializerTest {
 
     @Test
     public void testNormal() {
-        BigDecimal input = new BigDecimal("1234.1234");
+        BigDecimalMutable input = new BigDecimalMutable(new BigDecimal("1234.1234"));
         ByteBuffer buffer = ByteBuffer.allocate(256);
         buffer.mark();
         bigDecimalSerializer.serialize(input, buffer);
         buffer.reset();
-        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
+        BigDecimalMutable output = bigDecimalSerializer.deserialize(buffer);
         assertEquals(input, output);
     }
 
     @Test
     public void testScaleOutOfRange() {
-        BigDecimal input = new BigDecimal("1234.1234567890");
+        BigDecimalMutable input = new BigDecimalMutable(new BigDecimal("1234.1234567890"));
         ByteBuffer buffer = ByteBuffer.allocate(256);
         buffer.mark();
         bigDecimalSerializer.serialize(input, buffer);
         buffer.reset();
-        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
-        assertEquals(input.setScale(bigDecimalSerializer.type.getScale(), BigDecimal.ROUND_HALF_EVEN), output);
+        BigDecimalMutable output = bigDecimalSerializer.deserialize(buffer);
+        assertEquals(input.get().setScale(bigDecimalSerializer.type.getScale(), BigDecimal.ROUND_HALF_EVEN), output);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testOutOfPrecision() {
-        BigDecimal input = new BigDecimal("66855344214907231736.4924");
+        BigDecimalMutable input = new BigDecimalMutable(new BigDecimal("66855344214907231736.4924"));
         ByteBuffer buffer = ByteBuffer.allocate(256);
         bigDecimalSerializer.serialize(input, buffer);
     }
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
index 11b03bd..61d9ee5 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -27,8 +27,10 @@ import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.measure.BigDecimalMutable;
 import org.apache.kylin.metadata.measure.DoubleMutable;
 import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.StringMutable;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.ITuple;
 
@@ -126,6 +128,11 @@ public class Tuple implements ITuple {
             fieldValue = ((Number) fieldValue).intValue();
         } else if ("float".equals(dataType) && fieldValue instanceof BigDecimal) {
             fieldValue = ((BigDecimal) fieldValue).floatValue();
+        } else if ("date".equals(dataType) && fieldValue instanceof Long) {
+            long millis = Long.valueOf(fieldValue.toString());
+            fieldValue = (int) (millis / (1000 * 3600 * 24));
+        } else if ("smallint".equals(dataType) && fieldValue instanceof Long) {
+            fieldValue = Short.valueOf(fieldValue.toString());
         }
         values[idx] = fieldValue;
     }
@@ -135,6 +142,10 @@ public class Tuple implements ITuple {
             o = ((LongMutable) o).get();
         else if (o instanceof DoubleMutable)
             o = ((DoubleMutable) o).get();
+        else if (o instanceof BigDecimalMutable)
+            o = ((BigDecimalMutable) o).get();
+        else if (o instanceof StringMutable)
+            o = ((StringMutable) o).get();
         return o;
     }
 
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
index 07f72b1..616f0a3 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPEnumerator.java
@@ -195,7 +195,7 @@ public class OLAPEnumerator implements Enumerator<Object[]> {
             if (cube.getAllDimensions().contains(col)) {
                 sqlDigest.groupbyColumns.add(col);
             }
-            // For measure columns, take them as metric columns with aggregation function SUM().
+            // For measure columns, take them as metric columns with aggregation function SUM() or VALUE().
             else {
                 ParameterDesc colParameter = new ParameterDesc();
                 colParameter.setType("column");
@@ -204,17 +204,25 @@ public class OLAPEnumerator implements Enumerator<Object[]> {
                 sumFunc.setExpression("SUM");
                 sumFunc.setParameter(colParameter);
 
-                boolean measureHasSum = false;
+                FunctionDesc valueFunc = new FunctionDesc();
+                valueFunc.setExpression("VALUE");
+                valueFunc.setParameter(colParameter);
+
+                boolean measureHasFun = false;
                 for (MeasureDesc colMeasureDesc : cube.getMeasures()) {
                     if (colMeasureDesc.getFunction().equals(sumFunc)) {
-                        measureHasSum = true;
+                        measureHasFun = true;
+                        sqlDigest.aggregations.add(sumFunc);
+                        break;
+                    }
+                    if (colMeasureDesc.getFunction().equals(valueFunc)) {
+                        measureHasFun = true;
+                        sqlDigest.aggregations.add(valueFunc);
                         break;
                     }
                 }
-                if (measureHasSum) {
-                    sqlDigest.aggregations.add(sumFunc);
-                } else {
-                    logger.warn("SUM is not defined for measure column " + col + ", output will be meaningless.");
+                if (!measureHasFun) {
+                    logger.warn("SUM or VALUE is not defined for measure column " + col + ", output will be meaningless.");
                 }
 
                 sqlDigest.metricColumns.add(col);
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
index 7c1a19f..620e0e0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
@@ -25,9 +25,7 @@ import java.util.Collection;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.metadata.measure.DoubleMutable;
-import org.apache.kylin.metadata.measure.LongMutable;
-import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.measure.*;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 
@@ -89,7 +87,10 @@ public class RowValueDecoder implements Cloneable {
                 o = ((LongMutable) o).get();
             else if (o instanceof DoubleMutable)
                 o = ((DoubleMutable) o).get();
-
+            else if (o instanceof StringMutable)
+                o = ((StringMutable) o).get();
+            else if (o instanceof BigDecimalMutable)
+                o = ((BigDecimalMutable) o).get();
             results[i] = o;
         }
     }
diff --git a/webapp/app/js/controllers/cubeMeasures.js b/webapp/app/js/controllers/cubeMeasures.js
index 1560d23..9ec63a6 100644
--- a/webapp/app/js/controllers/cubeMeasures.js
+++ b/webapp/app/js/controllers/cubeMeasures.js
@@ -85,6 +85,7 @@ KylinApp.controller('CubeMeasuresCtrl', function ($scope, $modal,MetaModel,cubes
           break;
         case "MIN":
         case "MAX":
+        case "VALUE":
           $scope.newMeasure.function.returntype = colType;
           break;
         case "COUNT":
diff --git a/webapp/app/js/model/cubeConfig.js b/webapp/app/js/model/cubeConfig.js
index a647b53..5cce1e4 100644
--- a/webapp/app/js/model/cubeConfig.js
+++ b/webapp/app/js/model/cubeConfig.js
@@ -20,7 +20,7 @@ KylinApp.constant('cubeConfig', {
 
   //~ Define metadata & class
   measureParamType: ['column', 'constant'],
-  measureExpressions: ['SUM', 'MIN', 'MAX', 'COUNT', 'COUNT_DISTINCT'],
+  measureExpressions: ['SUM', 'MIN', 'MAX', 'COUNT', 'COUNT_DISTINCT', 'VALUE'],
   dimensionDataTypes: ["string", "tinyint", "int", "bigint", "date"],
   cubeCapacities: ["SMALL", "MEDIUM", "LARGE"],
 //    cubePartitionTypes : ['APPEND', 'UPDATE_INSERT'],
-- 
1.9.1

