From 6887d72bd6315751063190bf34effbdc1f5a16f1 Mon Sep 17 00:00:00 2001
From: wangxiaoyu8 <wangxiaoyu1@jd.com>
Date: Mon, 9 Nov 2015 14:34:43 +0800
Subject: [PATCH] KYLIN-1122 Kylin support detail data query from fact table

---
 .../org/apache/kylin/cube/kv/RowValueDecoder.java  |  8 +--
 .../cube/model/validation/rule/FunctionRule.java   |  4 +-
 .../kylin/metadata/measure/MeasureAggregator.java  |  6 +-
 .../kylin/metadata/measure/MeasureSerializer.java  | 20 +++++-
 .../kylin/metadata/measure/StringSerializer.java   | 82 ++++++++++++++++++++++
 .../metadata/measure/StringValueAggregator.java    | 70 ++++++++++++++++++
 .../org/apache/kylin/metadata/model/DataType.java  |  2 +
 .../apache/kylin/metadata/model/FunctionDesc.java  | 11 ++-
 .../kylin/query/enumerator/CubeEnumerator.java     | 22 ++++--
 .../java/org/apache/kylin/storage/tuple/Tuple.java | 11 +++
 webapp/app/js/controllers/cubeSchema.js            |  1 +
 webapp/app/js/model/cubeConfig.js                  |  2 +-
 12 files changed, 220 insertions(+), 19 deletions(-)
 create mode 100644 metadata/src/main/java/org/apache/kylin/metadata/measure/StringSerializer.java
 create mode 100644 metadata/src/main/java/org/apache/kylin/metadata/measure/StringValueAggregator.java

diff --git a/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java b/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
index f90a88d..87f6593 100644
--- a/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
+++ b/cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
@@ -24,10 +24,7 @@ import java.util.BitSet;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.*;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -88,7 +85,8 @@ public class RowValueDecoder implements Cloneable {
                 o = ((DoubleWritable) o).get();
             else if (o instanceof FloatWritable)
                 o = ((FloatWritable) o).get();
-
+            else if (o instanceof Text)
+                o = new String(((Text) o).getBytes(),0,((Text) o).getLength());
             results[i] = o;
         }
     }
diff --git a/cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java b/cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
index a271ab5..64e67f8 100644
--- a/cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
+++ b/cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
@@ -122,7 +122,9 @@ public class FunctionRule implements IValidatorRule<CubeDesc> {
             if (rtype.isNumberFamily() == false) {
                 context.addResult(ResultLevel.ERROR, "Return type for function " + func + " must be one of " + DataType.NUMBER_FAMILY);
             }
-        } else {
+        } else if (funcDesc.isValue()) {
+
+        }  else {
             if (StringUtils.equalsIgnoreCase(KylinConfig.getInstanceFromEnv().getProperty(KEY_IGNORE_UNKNOWN_FUNC, "false"), "false")) {
                 context.addResult(ResultLevel.ERROR, "Unrecognized function: [" + func + "]");
             }
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
index 6abf4af..4f87f4a 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
@@ -54,6 +54,10 @@ abstract public class MeasureAggregator<V> {
                 return new BigDecimalMinAggregator();
             else if (isDouble(returnType))
                 return new DoubleMinAggregator();
+        } else if (FunctionDesc.FUNC_VALUE.equalsIgnoreCase(funcName)) {
+            StringValueAggregator sva = new StringValueAggregator();
+            sva.setReturnType(returnType);
+            return sva;
         }
         throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + returnType + "'");
     }
@@ -67,7 +71,7 @@ abstract public class MeasureAggregator<V> {
     }
 
     public static boolean isInteger(String type) {
-        return "long".equalsIgnoreCase(type) || "bigint".equalsIgnoreCase(type) || "int".equalsIgnoreCase(type) || "integer".equalsIgnoreCase(type);
+        return "long".equalsIgnoreCase(type) || "bigint".equalsIgnoreCase(type) || "int".equalsIgnoreCase(type) || "integer".equalsIgnoreCase(type) || "smallint".equalsIgnoreCase(type);
     }
 
     public static int guessBigDecimalMemBytes() {
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureSerializer.java
index 2f2e39c..7db2492 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureSerializer.java
@@ -28,6 +28,15 @@ import org.apache.kylin.metadata.model.DataType;
  * 
  */
 abstract public class MeasureSerializer<T> implements BytesSerializer<T> {
+    private String dataType;
+
+    public String getDataType() {
+        return dataType;
+    }
+
+    public void setDataType(String dataType) {
+        this.dataType = dataType;
+    }
 
     final static HashMap<String, Class<?>> implementations = new HashMap<String, Class<?>>();
     static {
@@ -38,7 +47,14 @@ abstract public class MeasureSerializer<T> implements BytesSerializer<T> {
         implementations.put("long", LongSerializer.class);
         implementations.put("integer", LongSerializer.class);
         implementations.put("int", LongSerializer.class);
+        implementations.put("string", StringSerializer.class);
+        implementations.put("varchar", StringSerializer.class);
+        implementations.put("smallint", LongSerializer.class);
+        implementations.put("date", StringSerializer.class);
+        implementations.put("datetime", StringSerializer.class);
+        implementations.put("timestamp", StringSerializer.class);
     }
+    //TIMESTAMP,SMALLINT,DATE
 
     public static MeasureSerializer<?> create(String dataType) {
         DataType type = DataType.getInstance(dataType);
@@ -51,7 +67,9 @@ abstract public class MeasureSerializer<T> implements BytesSerializer<T> {
             throw new RuntimeException("No MeasureSerializer for type " + dataType);
 
         try {
-            return (MeasureSerializer<?>) clz.newInstance();
+            MeasureSerializer<?> ms = (MeasureSerializer<?>) clz.newInstance();
+            ms.setDataType(dataType);
+            return ms;
         } catch (Exception e) {
             throw new RuntimeException(e); // never happen
         }
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/StringSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/StringSerializer.java
new file mode 100644
index 0000000..15f5253
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/StringSerializer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.BytesUtil;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+/**
+ * Created by wangxiaoyu on 15-4-29.
+ */
+public class StringSerializer extends MeasureSerializer{
+    Object current = new Text();
+    @Override
+    public Object valueOf(byte[] value) {
+        String dataType = super.getDataType();
+        MeasureSerializer<?> ms = create(dataType);
+        if(ms instanceof StringSerializer){
+            if (value != null) {
+                ((Text)current).set(value);
+            }
+        } else {
+            current = ms.valueOf(value);
+        }
+        return current;
+    }
+
+    @Override
+    public void serialize(Object value, ByteBuffer out) {
+        if(value instanceof BigDecimal){
+            BigDecimalSerializer bigDecimalSerializer = new BigDecimalSerializer();
+            bigDecimalSerializer.serialize((BigDecimal)value,out);
+        } else if (value instanceof DoubleWritable){
+            DoubleSerializer doubleSerializer = new DoubleSerializer();
+            doubleSerializer.serialize((DoubleWritable)value,out);
+        } else if (value instanceof LongWritable){
+            LongSerializer longSerializer = new LongSerializer();
+            longSerializer.serialize((LongWritable)value,out);
+        } else {
+            Text v = (Text)value;
+            int length = v.getLength();
+            BytesUtil.writeVInt(length, out);
+            out.put(v.getBytes(),0,length);
+        }
+
+    }
+
+    @Override
+    public Object deserialize(ByteBuffer in) {
+        String dataType = super.getDataType();
+        MeasureSerializer<?> ms = create(dataType);
+        if(ms instanceof StringSerializer){
+            int length = BytesUtil.readVInt(in);
+            byte[] bytes = new byte[length];
+            in.get(bytes);
+            ((Text)current).set(bytes);
+        } else {
+            current = ms.deserialize(in);
+        }
+        return current;
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/StringValueAggregator.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/StringValueAggregator.java
new file mode 100644
index 0000000..a6cc728
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/StringValueAggregator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import java.math.BigDecimal;
+
+
+/**
+ * Created by wangxiaoyu on 15-4-29.
+ */
+public class StringValueAggregator extends MeasureAggregator<Object>{
+    private String returnType;
+
+    public void setReturnType(String returnType) {
+        this.returnType = returnType;
+    }
+    Object value = null;
+    @Override
+    public void reset() {
+        this.value = null;
+    }
+
+    @Override
+    public void aggregate(Object value) {
+        if (value != null) {
+            this.value = value;
+            if (isInteger(returnType)) {
+                ((LongWritable) this.value).set(((LongWritable) value).get());
+            } else if (isDouble(returnType)) {
+                ((DoubleWritable) this.value).set(((DoubleWritable) value).get());
+            } else if (isBigDecimal(returnType)) {
+                this.value = new BigDecimal(value.toString());
+            } else {
+                ((Text)this.value).set((Text)value);
+            }
+        }
+    }
+
+    @Override
+    public Object getState() {
+        return this.value;
+    }
+
+    @Override
+    public int getMemBytes() {
+        return 4 // ref
+            + 256;
+    }
+}
+
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java b/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
index d813d6f..be501ca 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
@@ -183,6 +183,8 @@ public class DataType {
             return 8;
         } else if (isHLLC()) {
             return 1 << precision;
+        } else if (isStringFamily()) {
+            return 256;//estimate
         }
         throw new IllegalStateException("The return type : " + name + " is not recognized;");
     }
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index 18be936..33ac9fa 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -36,6 +36,7 @@ public class FunctionDesc {
     public static final String FUNC_MAX = "MAX";
     public static final String FUNC_COUNT = "COUNT";
     public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
+    public static final String FUNC_VALUE = "VALUE";
 
     public static final String PARAMTER_TYPE_CONSTANT = "constant";
     public static final String PARAMETER_TYPE_COLUMN = "column";
@@ -51,7 +52,7 @@ public class FunctionDesc {
     private boolean isDimensionAsMetric = false;
 
     public String getRewriteFieldName() {
-        if (isSum()) {
+        if (isSum() || isValue()) {
             return getParameter().getValue();
         } else if (isCount()) {
             return "COUNT__"; // ignores parameter, count(*), count(1),
@@ -62,7 +63,7 @@ public class FunctionDesc {
     }
 
     public boolean needRewrite() {
-        return !isSum() && !isHolisticCountDistinct() && !isDimensionAsMetric();
+        return !isValue() && !isSum() && !isHolisticCountDistinct() && !isDimensionAsMetric();
     }
 
     public boolean isMin() {
@@ -93,6 +94,10 @@ public class FunctionDesc {
         }
     }
 
+    public boolean isValue() {
+        return FUNC_VALUE.equalsIgnoreCase(expression);
+    }
+
     /**
      * Get Full Expression such as sum(amount), count(1), count(*)...
      */
@@ -133,7 +138,7 @@ public class FunctionDesc {
     public DataType getSQLType() {
         if (isCountDistinct())
             return DataType.ANY;
-        else if (isSum() || isMax() || isMin())
+        else if (isSum() || isMax() || isMin() || isValue())
             return parameter.getColRefs().get(0).getType();
         else
             return returnDataType;
diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/CubeEnumerator.java b/query/src/main/java/org/apache/kylin/query/enumerator/CubeEnumerator.java
index 66a4035..c02c6ac 100644
--- a/query/src/main/java/org/apache/kylin/query/enumerator/CubeEnumerator.java
+++ b/query/src/main/java/org/apache/kylin/query/enumerator/CubeEnumerator.java
@@ -211,7 +211,7 @@ public class CubeEnumerator implements Enumerator<Object[]> {
             if (cube.getAllDimensions().contains(col)) {
                 sqlDigest.groupbyColumns.add(col);
             }
-            // For measure columns, take them as metric columns with aggregation function SUM().
+            // For measure columns, take them as metric columns with aggregation function SUM() or VALUE().
             else {
                 ParameterDesc colParameter = new ParameterDesc();
                 colParameter.setType("column");
@@ -220,17 +220,25 @@ public class CubeEnumerator implements Enumerator<Object[]> {
                 sumFunc.setExpression("SUM");
                 sumFunc.setParameter(colParameter);
 
-                boolean measureHasSum = false;
+                FunctionDesc valueFunc = new FunctionDesc();
+                valueFunc.setExpression("VALUE");
+                valueFunc.setParameter(colParameter);
+
+                boolean measureHasFun = false;
                 for (MeasureDesc colMeasureDesc : cube.getMeasures()) {
                     if (colMeasureDesc.getFunction().equals(sumFunc)) {
-                        measureHasSum = true;
+                        measureHasFun = true;
+                        sqlDigest.aggregations.add(sumFunc);
+                        break;
+                    }
+                    if (colMeasureDesc.getFunction().equals(valueFunc)) {
+                        measureHasFun = true;
+                        sqlDigest.aggregations.add(valueFunc);
                         break;
                     }
                 }
-                if (measureHasSum) {
-                    sqlDigest.aggregations.add(sumFunc);
-                } else {
-                    logger.warn("SUM is not defined for measure column " + col + ", output will be meaningless.");
+                if (!measureHasFun) {
+                    logger.warn("SUM or VALUE is not defined for measure column " + col + ", output will be meaningless.");
                 }
 
                 sqlDigest.metricColumns.add(col);
diff --git a/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
index 556ddca..877455a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
+++ b/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -102,6 +102,17 @@ public class Tuple implements ITuple {
             fieldValue = ((Number) fieldValue).intValue();
         } else if ("float".equals(dataType) && fieldValue instanceof BigDecimal) {
             fieldValue = ((BigDecimal) fieldValue).floatValue();
+        } else if (("time".equals(dataType)
+                || "datetime".equals(dataType) || "timestamp".equals(dataType))
+                && fieldValue instanceof String) {
+            fieldValue = Long.valueOf(DateFormat.stringToMillis((String)fieldValue));
+        } else if ("date".equals(dataType) && fieldValue instanceof String) {
+            Date dateValue = DateFormat.stringToDate((String)fieldValue);
+            long millis = dateValue.getTime();
+            long days = millis / (1000 * 3600 * 24);
+            fieldValue = Integer.valueOf((int) days);
+        } else if ("smallint".equals(dataType) && fieldValue instanceof Long) {
+            fieldValue = Short.valueOf(fieldValue.toString());
         }
 
         setFieldObjectValue(fieldName, fieldValue);
diff --git a/webapp/app/js/controllers/cubeSchema.js b/webapp/app/js/controllers/cubeSchema.js
index bbcc1fb..4098601 100755
--- a/webapp/app/js/controllers/cubeSchema.js
+++ b/webapp/app/js/controllers/cubeSchema.js
@@ -160,6 +160,7 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
           break;
         case "MIN":
         case "MAX":
+        case "VALUE":
           $scope.newMeasure.function.returntype = colType;
           break;
         case "COUNT":
diff --git a/webapp/app/js/model/cubeConfig.js b/webapp/app/js/model/cubeConfig.js
index 8662680..95a4344 100644
--- a/webapp/app/js/model/cubeConfig.js
+++ b/webapp/app/js/model/cubeConfig.js
@@ -20,7 +20,7 @@ KylinApp.constant('cubeConfig', {
 
   //~ Define metadata & class
   measureParamType: ['column', 'constant'],
-  measureExpressions: ['SUM', 'MIN', 'MAX', 'COUNT', 'COUNT_DISTINCT'],
+  measureExpressions: ['SUM', 'MIN', 'MAX', 'COUNT', 'COUNT_DISTINCT', 'VALUE'],
   dimensionDataTypes: ["string", "tinyint", "int", "bigint", "date"],
   cubeCapacities: ["SMALL", "MEDIUM", "LARGE"],
 //    cubePartitionTypes : ['APPEND', 'UPDATE_INSERT'],
-- 
1.9.1

