diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 3fcf0dc064f26d5e85a3a035e4e0b3b29638768c..9e613227b91aec84e83f5b1c185e98ee80c978bb 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -283,8 +283,7 @@ protected Void performDataRead() throws IOException, InterruptedException { recordReaderTime(startTime); return null; } - counters.setDesc(QueryFragmentCounters.Desc.TABLE, - LlapHiveUtils.getDbAndTableNameForMetrics(split.getPath(), false, parts)); + counters.setDesc(QueryFragmentCounters.Desc.TABLE, cacheTag.getTableName()); counters.setDesc(QueryFragmentCounters.Desc.FILE, split.getPath() + (fileKey == null ? "" : " (" + fileKey + ")")); try { diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java index 03e66a8b0305ca1860ae943a58933e00089cbce9..175cbac0dc9427d8775a7a4a7fe210e6041b3753 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestCacheContentsTracker.java @@ -17,15 +17,14 @@ */ package org.apache.hadoop.hive.llap.cache; -import java.util.Arrays; -import java.util.LinkedList; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.hive.common.io.CacheTag; import org.junit.BeforeClass; import org.junit.Test; -import static java.util.stream.Collectors.toCollection; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -123,6 +122,27 @@ public void testCacheTagComparison() { } + @Test + public void testEncodingDecoding() throws Exception { + Map partDescs = new HashMap<>(); + partDescs.put("pytha=goras", "a2+b2=c2"); + CacheTag tag = CacheTag.build("math.rules", partDescs); + CacheTag.SinglePartitionCacheTag stag = ((CacheTag.SinglePartitionCacheTag)tag); + assertEquals("pytha=goras=a2+b2=c2", stag.partitionDescToString()); + assertEquals(1, stag.getPartitionDescMap().size()); + assertEquals("a2+b2=c2", stag.getPartitionDescMap().get("pytha=goras")); + + partDescs.clear(); + partDescs.put("mutli=one", "one=/1"); + partDescs.put("mutli=two/", "two=2"); + tag = CacheTag.build("math.rules", partDescs); + CacheTag.MultiPartitionCacheTag mtag = ((CacheTag.MultiPartitionCacheTag)tag); + assertEquals("mutli=one=one=/1/mutli=two/=two=2", mtag.partitionDescToString()); + assertEquals(2, mtag.getPartitionDescMap().size()); + assertEquals("one=/1", mtag.getPartitionDescMap().get("mutli=one")); + assertEquals("two=2", mtag.getPartitionDescMap().get("mutli=two/")); + } + private static void compareViceVersa(int expected, CacheTag a, CacheTag b) { if (a != null) { assertEquals(expected, a.compareTo(b)); @@ -148,8 +168,12 @@ private static LlapCacheableBuffer createMockBuffer(long size, CacheTag cacheTag private static CacheTag cacheTagBuilder(String dbAndTable, String... partitions) { if (partitions != null && partitions.length > 0) { - LinkedList parts = Arrays.stream(partitions).collect(toCollection(LinkedList::new)); - return CacheTag.build(dbAndTable, parts); + Map partDescs = new HashMap<>(); + for (String partition : partitions) { + String[] partDesc = partition.split("="); + partDescs.put(partDesc[0], partDesc[1]); + } + return CacheTag.build(dbAndTable, partDescs); } else { return CacheTag.build(dbAndTable); } diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java index 47730ccbe3ec9a1b123c53aef2253af88da10c66..3062caa557b8032723cefa01791487eccdf8855b 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/io/CacheTag.java @@ -17,7 +17,10 @@ */ package org.apache.hadoop.hive.common.io; -import java.util.LinkedList; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.StringUtils; @@ -31,6 +34,9 @@ * DB/table/1st_partition/.../nth_partition . */ public abstract class CacheTag implements Comparable { + + private static final String ENCODING = "UTF-8"; + /** * Prepended by DB name and '.' . */ @@ -79,33 +85,18 @@ public static final CacheTag build(String tableName, Map partDes throw new IllegalArgumentException(); } - LinkedList partDescList = new LinkedList<>(); + String[] partDescs = new String[partDescMap.size()]; + int i = 0; for (Map.Entry entry : partDescMap.entrySet()) { - StringBuilder sb = new StringBuilder(); - sb.append(entry.getKey()).append("=").append(entry.getValue()); - partDescList.add(sb.toString()); - } - - if (partDescList.size() == 1) { - return new SinglePartitionCacheTag(tableName, partDescList.get(0)); - } else { - // In this case it must be >1 - return new MultiPartitionCacheTag(tableName, partDescList); - } - } - - // Assumes elements of partDescList are already in p1=v1 format - public static final CacheTag build(String tableName, LinkedList partDescList) { - if (StringUtils.isEmpty(tableName) || partDescList == null || partDescList.isEmpty()) { - throw new IllegalArgumentException(); + partDescs[i++] = encodePartDesc(entry.getKey(), entry.getValue()); } - if (partDescList.size() == 1) { - return new SinglePartitionCacheTag(tableName, partDescList.get(0)); + if (partDescs.length == 1) { + return new SinglePartitionCacheTag(tableName, partDescs[0]); } else { // In this case it must be >1 - return new MultiPartitionCacheTag(tableName, partDescList); + return new MultiPartitionCacheTag(tableName, partDescs); } } @@ -121,13 +112,15 @@ public static final CacheTag createParentCacheTag(CacheTag tag) { if (tag instanceof MultiPartitionCacheTag) { MultiPartitionCacheTag multiPartitionCacheTag = (MultiPartitionCacheTag) tag; - if (multiPartitionCacheTag.partitionDesc.size() > 2) { - LinkedList subList = new LinkedList<>(multiPartitionCacheTag.partitionDesc); - subList.removeLast(); + if (multiPartitionCacheTag.partitionDesc.length > 2) { + String[] subList = new String[multiPartitionCacheTag.partitionDesc.length - 1]; + for (int i = 0; i < subList.length; ++i) { + subList[i] = multiPartitionCacheTag.partitionDesc[i]; + } return new MultiPartitionCacheTag(multiPartitionCacheTag.tableName, subList); } else { return new SinglePartitionCacheTag(multiPartitionCacheTag.tableName, - multiPartitionCacheTag.partitionDesc.get(0)); + multiPartitionCacheTag.partitionDesc[0]); } } @@ -182,6 +175,12 @@ private PartitionCacheTag(String tableName) { */ public abstract String partitionDescToString(); + /** + * Returns a map of partition keys and values built from the information of this CacheTag. + * @return the map + */ + public abstract Map getPartitionDescMap(); + } /** @@ -201,7 +200,15 @@ private SinglePartitionCacheTag(String tableName, String partitionDesc) { @Override public String partitionDescToString() { - return this.partitionDesc; + return String.join("=", CacheTag.decodePartDesc(partitionDesc)); + } + + @Override + public Map getPartitionDescMap() { + Map result = new HashMap<>(); + String[] partition = CacheTag.decodePartDesc(partitionDesc); + result.put(partition[0], partition[1]); + return result; } @Override @@ -234,13 +241,15 @@ public int hashCode() { */ public static final class MultiPartitionCacheTag extends PartitionCacheTag { - private final LinkedList partitionDesc; + private final String[] partitionDesc; - private MultiPartitionCacheTag(String tableName, LinkedList partitionDesc) { + private MultiPartitionCacheTag(String tableName, String[] partitionDesc) { super(tableName); - this.partitionDesc = partitionDesc; - if (this.partitionDesc != null && this.partitionDesc.size() > 1) { - this.partitionDesc.stream().forEach(p -> p.intern()); + if (partitionDesc != null && partitionDesc.length > 1) { + for (int i = 0; i < partitionDesc.length; ++i) { + partitionDesc[i] = partitionDesc[i].intern(); + } + this.partitionDesc = partitionDesc; } else { throw new IllegalArgumentException(); } @@ -259,12 +268,12 @@ public int compareTo(CacheTag o) { if (tableNameDiff != 0) { return tableNameDiff; } else { - int sizeDiff = partitionDesc.size() - other.partitionDesc.size(); + int sizeDiff = partitionDesc.length - other.partitionDesc.length; if (sizeDiff != 0) { return sizeDiff; } else { - for (int i = 0; i < partitionDesc.size(); ++i) { - int partDiff = partitionDesc.get(i).compareTo(other.partitionDesc.get(i)); + for (int i = 0; i < partitionDesc.length; ++i) { + int partDiff = partitionDesc[i].compareTo(other.partitionDesc[i]); if (partDiff != 0) { return partDiff; } @@ -285,9 +294,61 @@ public int hashCode() { @Override public String partitionDescToString() { - return String.join("/", this.partitionDesc); + StringBuilder sb = new StringBuilder(); + for (String partDesc : partitionDesc) { + String[] partition = CacheTag.decodePartDesc(partDesc); + sb.append(partition[0]).append('=').append(partition[1]); + sb.append('/'); + } + sb.deleteCharAt(sb.length() - 1); + return sb.toString(); } + @Override + public Map getPartitionDescMap() { + Map result = new HashMap<>(); + for (String partDesc : partitionDesc) { + String[] partition = CacheTag.decodePartDesc(partDesc); + result.put(partition[0], partition[1]); + } + return result; + } + } + + /** + * Combines partition key and value Strings into one by encoding each and concating with '=' . + * @param partKey + * @param partVal + * @return + */ + private static String encodePartDesc(String partKey, String partVal) { + try { + StringBuilder sb = new StringBuilder(); + sb.append( + URLEncoder.encode(partKey, ENCODING)) + .append('=') + .append(URLEncoder.encode(partVal, ENCODING)); + return sb.toString(); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + /** + * Splits and decodes an a partition desc String encoded by encodePartDesc(). + * @param partDesc + * @return + */ + private static String[] decodePartDesc(String partDesc) { + try { + String[] encodedPartDesc = partDesc.split("="); + assert encodedPartDesc.length == 2; + return new String[] { + URLDecoder.decode(encodedPartDesc[0], ENCODING), + URLDecoder.decode(encodedPartDesc[1], ENCODING)}; + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } } }