Index: build-common.xml =================================================================== --- build-common.xml (revision 736088) +++ build-common.xml (working copy) @@ -241,6 +241,7 @@ --> + Index: ql/src/test/results/clientpositive/input3_limit.q.out =================================================================== --- ql/src/test/results/clientpositive/input3_limit.q.out (revision 736088) +++ ql/src/test/results/clientpositive/input3_limit.q.out (working copy) @@ -1,30 +1,66 @@ ABSTRACT SYNTAX TREE: - (TOK_QUERY (TOK_FROM (TOK_TABREF T1 a)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB T2)) (TOK_SELECT (TOK_SELEXPR (TOK_COLREF a key)) (TOK_SELEXPR (TOK_COLREF a value))) (TOK_LIMIT 20))) + (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF T1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_DISTRIBUTEBY key) (TOK_SORTBY (TOK_TABSORTCOLNAMEASC key) (TOK_TABSORTCOLNAMEASC value)))) T)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB T2)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_LIMIT 20))) STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: - a + t:t1 Select Operator expressions: expr: key type: string expr: value type: string - Limit - Reduce Output Operator - sort order: - tag: -1 - value expressions: - expr: 0 - type: string - expr: 1 - type: string + Reduce Output Operator + key expressions: + expr: 0 + type: string + expr: 1 + type: string + sort order: ++ + Map-reduce partition columns: + expr: 0 + type: string + tag: -1 + value expressions: + expr: 0 + type: string + expr: 1 + type: string + Reduce Operator Tree: + Extract + Select Operator + expressions: + expr: 0 + type: string + expr: 1 + type: string + Limit + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.mapred.SequenceFileOutputFormat + name: binary_table + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + /data/users/athusoo/commits/hive_trunk_ws2/build/ql/tmp/80706519/149054896.10001 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: 0 + type: string + expr: 1 + type: string # Reducers: 1 Reduce Operator Tree: Extract @@ -48,23 +84,23 @@ name: t2 -128 val_128 -150 val_150 -165 val_165 -193 val_193 -213 val_213 -224 val_224 -238 val_238 -255 val_255 -265 val_265 -27 val_27 -273 val_273 -278 val_278 -311 val_311 -369 val_369 -401 val_401 -409 val_409 -484 val_484 -66 val_66 -86 val_86 -98 val_98 +0 val_0 +0 val_0 +0 val_0 +0 val_1 +0 val_1 +1 val_2 +10 val_10 +10 val_11 +100 val_100 +100 val_100 +100 val_101 +100 val_101 +101 val_102 +102 val_103 +103 val_103 +103 val_103 +104 val_104 +104 val_104 +104 val_105 +104 val_105 Index: ql/src/test/queries/clientpositive/input3_limit.q =================================================================== --- ql/src/test/queries/clientpositive/input3_limit.q (revision 736088) +++ ql/src/test/queries/clientpositive/input3_limit.q (working copy) @@ -7,12 +7,11 @@ CREATE TABLE T2(key STRING, value STRING); EXPLAIN -INSERT OVERWRITE TABLE T2 SELECT a.key, a.value from T1 a LIMIT 20; +INSERT OVERWRITE TABLE T2 SELECT * FROM (SELECT * FROM T1 DISTRIBUTE BY key SORT BY key, value) T LIMIT 20; +INSERT OVERWRITE TABLE T2 SELECT * FROM (SELECT * FROM T1 DISTRIBUTE BY key SORT BY key, value) T LIMIT 20; -INSERT OVERWRITE TABLE T2 SELECT a.key, a.value from T1 a LIMIT 20; +SELECT * FROM T2; -SELECT * FROM (SELECT * FROM T2 DISTRIBUTE BY key SORT BY key, value) T; - DROP TABLE T1; DROP TABLE T2; Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (revision 736088) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (working copy) @@ -22,6 +22,8 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; import java.util.regex.Matcher; @@ -41,126 +43,140 @@ */ public class Partition { - @SuppressWarnings("nls") - static final private Log LOG = LogFactory.getLog("hive.ql.metadata.Partition"); + /** + * Class to compare partitions by the path name. This class is used to sor + * the partitions by name in getBucketPath. + */ + public class PathNameComparator implements Comparator { - private Table table; - private org.apache.hadoop.hive.metastore.api.Partition tPartition; /** - * @return the tPartition + * The comparison function. */ - public org.apache.hadoop.hive.metastore.api.Partition getTPartition() { - return tPartition; + public int compare(FileStatus f1, FileStatus f2) { + return f1.getPath().compareTo(f2.getPath()); } + } - private LinkedHashMap spec; - - /** - * @return - * @see org.apache.hadoop.hive.metastore.api.Partition#getValues() - */ - public List getValues() { - return tPartition.getValues(); - } + @SuppressWarnings("nls") + static final private Log LOG = LogFactory.getLog("hive.ql.metadata.Partition"); - private Path partPath; - private URI partURI; + private Table table; + private org.apache.hadoop.hive.metastore.api.Partition tPartition; + /** + * @return the tPartition + */ + public org.apache.hadoop.hive.metastore.api.Partition getTPartition() { + return tPartition; + } - public Partition(Table tbl, org.apache.hadoop.hive.metastore.api.Partition tp) throws HiveException { - this.table = tbl; - this.tPartition = tp; - partName = ""; - if(table.isPartitioned()) { - try { - partName = Warehouse.makePartName(tbl.getPartCols(), tp.getValues()); - } catch (MetaException e) { - throw new HiveException("Invalid partition for table " + tbl.getName(), e); - } - this.partPath = new Path(table.getPath(), partName); - } else { - // We are in the HACK territory. SemanticAnalyzer expects a single partition whose schema - // is same as the table partition. - this.partPath = table.getPath(); - } - spec = createSpec(tbl, tp); - - URI tmpURI = table.getDataLocation(); - try { - partURI = new URI(tmpURI.getScheme(), tmpURI.getAuthority(), - tmpURI.getPath() + "/" + partName, null, null); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - } - - /** - * Creates a partition name -> value spec map object - * @param tbl Use the information from this table. - * @param tp Use the information from this partition. - * @return Partition name to value mapping. - */ - private LinkedHashMap createSpec(Table tbl, - org.apache.hadoop.hive.metastore.api.Partition tp) { - - List fsl = tbl.getPartCols(); - List tpl = tp.getValues(); - LinkedHashMap spec = new LinkedHashMap(); - for (int i = 0; i < tbl.getPartCols().size(); i++) { - FieldSchema fs = fsl.get(i); - String value = tpl.get(i); - spec.put(fs.getName(), value); - } - return spec; - } + private LinkedHashMap spec; - public URI makePartURI(LinkedHashMap spec) throws HiveException { - StringBuffer suffix = new StringBuffer(); - if (table.getPartCols() != null) { - for(FieldSchema k: table.getPartCols()) { - suffix.append(k + "=" + spec.get(k.getName()) + "/"); - } - } - URI tmpURI = table.getDataLocation(); + /** + * @return + * @see org.apache.hadoop.hive.metastore.api.Partition#getValues() + */ + public List getValues() { + return tPartition.getValues(); + } + + private Path partPath; + private URI partURI; + + public Partition(Table tbl, org.apache.hadoop.hive.metastore.api.Partition tp) throws HiveException { + this.table = tbl; + this.tPartition = tp; + partName = ""; + if(table.isPartitioned()) { try { - return new URI(tmpURI.getScheme(), tmpURI.getAuthority(), - tmpURI.getPath() + suffix.toString(), null, null); - } catch (URISyntaxException e) { - throw new HiveException(e); + partName = Warehouse.makePartName(tbl.getPartCols(), tp.getValues()); + } catch (MetaException e) { + throw new HiveException("Invalid partition for table " + tbl.getName(), e); } + this.partPath = new Path(table.getPath(), partName); + } else { + // We are in the HACK territory. SemanticAnalyzer expects a single partition whose schema + // is same as the table partition. + this.partPath = table.getPath(); } + spec = createSpec(tbl, tp); - public String getName() { - return partName; + URI tmpURI = table.getDataLocation(); + try { + partURI = new URI(tmpURI.getScheme(), tmpURI.getAuthority(), + tmpURI.getPath() + "/" + partName, null, null); + } catch (URISyntaxException e) { + throw new RuntimeException(e); } + } - public Table getTable() { - return (this.table); - } + /** + * Creates a partition name -> value spec map object + * @param tbl Use the information from this table. + * @param tp Use the information from this partition. + * @return Partition name to value mapping. + */ + private LinkedHashMap createSpec(Table tbl, + org.apache.hadoop.hive.metastore.api.Partition tp) { - public Path [] getPath() { - Path [] ret = new Path [1]; - ret[0] = this.partPath; - return(ret); + List fsl = tbl.getPartCols(); + List tpl = tp.getValues(); + LinkedHashMap spec = new LinkedHashMap(); + for (int i = 0; i < tbl.getPartCols().size(); i++) { + FieldSchema fs = fsl.get(i); + String value = tpl.get(i); + spec.put(fs.getName(), value); } + return spec; + } - public Path getPartitionPath() { - return this.partPath; + public URI makePartURI(LinkedHashMap spec) throws HiveException { + StringBuffer suffix = new StringBuffer(); + if (table.getPartCols() != null) { + for(FieldSchema k: table.getPartCols()) { + suffix.append(k + "=" + spec.get(k.getName()) + "/"); + } } - - final public URI getDataLocation() { - return this.partURI; + URI tmpURI = table.getDataLocation(); + try { + return new URI(tmpURI.getScheme(), tmpURI.getAuthority(), + tmpURI.getPath() + suffix.toString(), null, null); + } catch (URISyntaxException e) { + throw new HiveException(e); } + } - /** - * The number of buckets is a property of the partition. However - internally we are just - * storing it as a property of the table as a short term measure. - */ - public int getBucketCount() { - return this.table.getNumBuckets(); - /* + public String getName() { + return partName; + } + + public Table getTable() { + return (this.table); + } + + public Path [] getPath() { + Path [] ret = new Path [1]; + ret[0] = this.partPath; + return(ret); + } + + public Path getPartitionPath() { + return this.partPath; + } + + final public URI getDataLocation() { + return this.partURI; + } + + /** + * The number of buckets is a property of the partition. However - internally we are just + * storing it as a property of the table as a short term measure. + */ + public int getBucketCount() { + return this.table.getNumBuckets(); + /* TODO: Keeping this code around for later use when we will support sampling on tables which are not created with CLUSTERED INTO clause - + // read from table meta data int numBuckets = this.table.getNumBuckets(); if (numBuckets == -1) { @@ -177,147 +193,148 @@ } } return numBuckets; - */ + */ + } + + public List getBucketCols() { + return this.table.getBucketCols(); + } + + /** + * mapping from bucket number to bucket path + */ + //TODO: add test case and clean it up + @SuppressWarnings("nls") + public Path getBucketPath(int bucketNum) { + try { + FileSystem fs = FileSystem.get(this.table.getDataLocation(), Hive.get().getConf()); + String pathPattern = this.partPath.toString(); + if (getBucketCount() > 0) { + pathPattern = pathPattern + "/*"; + } + LOG.info("Path pattern = " + pathPattern); + FileStatus srcs[] = fs.globStatus(new Path(pathPattern)); + Arrays.sort(srcs, new PathNameComparator()); + for (FileStatus src: srcs) { + LOG.info("Got file: " + src.getPath()); + } + return srcs[bucketNum].getPath(); } - - public List getBucketCols() { - return this.table.getBucketCols(); + catch (Exception e) { + throw new RuntimeException("Cannot get bucket path for bucket " + bucketNum, e); } + // return new Path(this.partPath, String.format("part-%1$05d", bucketNum)); + } - /** - * mapping from bucket number to bucket path - */ - //TODO: add test case and clean it up - @SuppressWarnings("nls") - public Path getBucketPath(int bucketNum) { - try { - FileSystem fs = FileSystem.get(this.table.getDataLocation(), Hive.get().getConf()); - String pathPattern = this.partPath.toString(); - if (getBucketCount() > 0) { - pathPattern = pathPattern + "/*"; - } - LOG.info("Path pattern = " + pathPattern); - FileStatus srcs[] = fs.globStatus(new Path(pathPattern)); - for (FileStatus src: srcs) { - LOG.info("Got file: " + src.getPath()); - } - return srcs[bucketNum].getPath(); - } - catch (Exception e) { - throw new RuntimeException("Cannot get bucket path for bucket " + bucketNum, e); - } - // return new Path(this.partPath, String.format("part-%1$05d", bucketNum)); - } + /** + * mapping from a Path to the bucket number if any + */ + private static Pattern bpattern = Pattern.compile("part-([0-9][0-9][0-9][0-9][0-9])"); - /** - * mapping from a Path to the bucket number if any - */ - private static Pattern bpattern = Pattern.compile("part-([0-9][0-9][0-9][0-9][0-9])"); - - private String partName; - @SuppressWarnings("nls") - public static int getBucketNum(Path p) { - Matcher m = bpattern.matcher(p.getName()); - if(m.find()) { - String bnum_str = m.group(1); - try { - return (Integer.parseInt(bnum_str)); - } catch (NumberFormatException e) { - throw new RuntimeException("Unexpected error parsing: "+p.getName()+","+bnum_str); - } - } - return 0; + private String partName; + @SuppressWarnings("nls") + public static int getBucketNum(Path p) { + Matcher m = bpattern.matcher(p.getName()); + if(m.find()) { + String bnum_str = m.group(1); + try { + return (Integer.parseInt(bnum_str)); + } catch (NumberFormatException e) { + throw new RuntimeException("Unexpected error parsing: "+p.getName()+","+bnum_str); + } } + return 0; + } - @SuppressWarnings("nls") - public Path [] getPath(Sample s) throws HiveException { - if(s == null) { - return getPath(); - } else { - int bcount = this.getBucketCount(); - if(bcount == 0) { - return getPath(); - } + @SuppressWarnings("nls") + public Path [] getPath(Sample s) throws HiveException { + if(s == null) { + return getPath(); + } else { + int bcount = this.getBucketCount(); + if(bcount == 0) { + return getPath(); + } - Dimension d = s.getSampleDimension(); - if(!d.getDimensionId().equals(this.table.getBucketingDimensionId())) { - // if the bucket dimension is not the same as the sampling dimension - // we must scan all the data - return getPath(); - } + Dimension d = s.getSampleDimension(); + if(!d.getDimensionId().equals(this.table.getBucketingDimensionId())) { + // if the bucket dimension is not the same as the sampling dimension + // we must scan all the data + return getPath(); + } - int scount = s.getSampleFraction(); - ArrayList ret = new ArrayList (); + int scount = s.getSampleFraction(); + ArrayList ret = new ArrayList (); - if(bcount == scount) { - ret.add(getBucketPath(s.getSampleNum()-1)); - } else if (bcount < scount) { - if((scount/bcount)*bcount != scount) { - throw new HiveException("Sample Count"+scount+" is not a multiple of bucket count " + - bcount + " for table " + this.table.getName()); - } - // undersampling a bucket - ret.add(getBucketPath((s.getSampleNum()-1)%bcount)); - } else if (bcount > scount) { - if((bcount/scount)*scount != bcount) { - throw new HiveException("Sample Count"+scount+" is not a divisor of bucket count " + - bcount + " for table " + this.table.getName()); - } - // sampling multiple buckets - for(int i=0; i scount) { + if((bcount/scount)*scount != bcount) { + throw new HiveException("Sample Count"+scount+" is not a divisor of bucket count " + + bcount + " for table " + this.table.getName()); + } + // sampling multiple buckets + for(int i=0; i getSpec() { - return this.spec; - } + public LinkedHashMap getSpec() { + return this.spec; + } - /** - * Replaces files in the partition with new data set specified by srcf. Works by moving files - * - * @param srcf Files to be moved. Leaf Directories or Globbed File Paths - */ - @SuppressWarnings("nls") - protected void replaceFiles(Path srcf) throws HiveException { - FileSystem fs; - try { - fs = FileSystem.get(table.getDataLocation(), Hive.get().getConf()); - Hive.get().replaceFiles(srcf, partPath, fs); - } catch (IOException e) { - throw new HiveException("addFiles: filesystem error in check phase", e); - } + /** + * Replaces files in the partition with new data set specified by srcf. Works by moving files + * + * @param srcf Files to be moved. Leaf Directories or Globbed File Paths + */ + @SuppressWarnings("nls") + protected void replaceFiles(Path srcf) throws HiveException { + FileSystem fs; + try { + fs = FileSystem.get(table.getDataLocation(), Hive.get().getConf()); + Hive.get().replaceFiles(srcf, partPath, fs); + } catch (IOException e) { + throw new HiveException("addFiles: filesystem error in check phase", e); } + } - /** - * Inserts files specified into the partition. Works by moving files - * - * @param srcf Files to be moved. Leaf Directories or Globbed File Paths - */ - @SuppressWarnings("nls") - protected void copyFiles(Path srcf) throws HiveException { - FileSystem fs; - try { - fs = FileSystem.get(table.getDataLocation(), Hive.get().getConf()); - Hive.get().copyFiles(srcf, partPath, fs); - } catch (IOException e) { - throw new HiveException("addFiles: filesystem error in check phase", e); - } + /** + * Inserts files specified into the partition. Works by moving files + * + * @param srcf Files to be moved. Leaf Directories or Globbed File Paths + */ + @SuppressWarnings("nls") + protected void copyFiles(Path srcf) throws HiveException { + FileSystem fs; + try { + fs = FileSystem.get(table.getDataLocation(), Hive.get().getConf()); + Hive.get().copyFiles(srcf, partPath, fs); + } catch (IOException e) { + throw new HiveException("addFiles: filesystem error in check phase", e); } + } - @SuppressWarnings("nls") - @Override - public String toString() { - String pn = "Invalid Partition"; - try { - pn = Warehouse.makePartName(spec); - } catch (MetaException e) { - // ignore as we most probably in an exception path already otherwise this error wouldn't occur - } - return table.toString() + "(" + pn + ")"; + @SuppressWarnings("nls") + @Override + public String toString() { + String pn = "Invalid Partition"; + try { + pn = Warehouse.makePartName(spec); + } catch (MetaException e) { + // ignore as we most probably in an exception path already otherwise this error wouldn't occur } + return table.toString() + "(" + pn + ")"; + } }