Index: ql/src/test/results/clientpositive/input3_limit.q.out =================================================================== --- ql/src/test/results/clientpositive/input3_limit.q.out (revision 739448) +++ ql/src/test/results/clientpositive/input3_limit.q.out (working copy) @@ -1,32 +1,68 @@ ABSTRACT SYNTAX TREE: - (TOK_QUERY (TOK_FROM (TOK_TABREF T1 a)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB T2)) (TOK_SELECT (TOK_SELEXPR (TOK_COLREF a key)) (TOK_SELEXPR (TOK_COLREF a value))) (TOK_LIMIT 20))) + (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF T1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_DISTRIBUTEBY (TOK_COLREF key)) (TOK_SORTBY (TOK_TABSORTCOLNAMEASC (TOK_COLREF key)) (TOK_TABSORTCOLNAMEASC (TOK_COLREF value))))) T)) (TOK_INSERT (TOK_DESTINATION (TOK_TAB T2)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_LIMIT 20))) STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 + Stage-2 depends on stages: Stage-1 + Stage-0 depends on stages: Stage-2 STAGE PLANS: Stage: Stage-1 Map Reduce Alias -> Map Operator Tree: - a + t:t1 Select Operator expressions: expr: key type: string expr: value type: string - Limit - Reduce Output Operator - sort order: - tag: -1 - value expressions: - expr: 0 - type: string - expr: 1 - type: string + Reduce Output Operator + key expressions: + expr: 0 + type: string + expr: 1 + type: string + sort order: ++ + Map-reduce partition columns: + expr: 0 + type: string + tag: -1 + value expressions: + expr: 0 + type: string + expr: 1 + type: string Reduce Operator Tree: Extract + Select Operator + expressions: + expr: 0 + type: string + expr: 1 + type: string + Limit + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.mapred.SequenceFileOutputFormat + name: binary_table + + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + /data/users/athusoo/commits/hive_trunk_ws1/build/ql/tmp/156027535/510873534.10001 + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: 0 + type: string + expr: 1 + type: string + Reduce Operator Tree: + Extract Limit File Output Operator compressed: false @@ -47,23 +83,23 @@ name: t2 -128 val_128 -150 val_150 -165 val_165 -193 val_193 -213 val_213 -224 val_224 -238 val_238 -255 val_255 -265 val_265 -27 val_27 -273 val_273 -278 val_278 -311 val_311 -369 val_369 -401 val_401 -409 val_409 -484 val_484 -66 val_66 -86 val_86 -98 val_98 +0 val_0 +0 val_0 +0 val_0 +0 val_1 +0 val_1 +1 val_2 +10 val_10 +10 val_11 +100 val_100 +100 val_100 +100 val_101 +100 val_101 +101 val_102 +102 val_103 +103 val_103 +103 val_103 +104 val_104 +104 val_104 +104 val_105 +104 val_105 Index: ql/src/test/queries/clientpositive/input3_limit.q =================================================================== --- ql/src/test/queries/clientpositive/input3_limit.q (revision 739448) +++ ql/src/test/queries/clientpositive/input3_limit.q (working copy) @@ -7,12 +7,11 @@ CREATE TABLE T2(key STRING, value STRING); EXPLAIN -INSERT OVERWRITE TABLE T2 SELECT a.key, a.value from T1 a LIMIT 20; +INSERT OVERWRITE TABLE T2 SELECT * FROM (SELECT * FROM T1 DISTRIBUTE BY key SORT BY key, value) T LIMIT 20; +INSERT OVERWRITE TABLE T2 SELECT * FROM (SELECT * FROM T1 DISTRIBUTE BY key SORT BY key, value) T LIMIT 20; -INSERT OVERWRITE TABLE T2 SELECT a.key, a.value from T1 a LIMIT 20; +SELECT * FROM T2; -SELECT * FROM (SELECT * FROM T2 DISTRIBUTE BY key SORT BY key, value) T; - DROP TABLE T1; DROP TABLE T2; Index: ql/src/test/queries/clientpositive/sample3.q =================================================================== --- ql/src/test/queries/clientpositive/sample3.q (revision 739448) +++ ql/src/test/queries/clientpositive/sample3.q (working copy) @@ -1,5 +1,5 @@ -- no input pruning, sample filter -EXPLAIN +EXPLAIN EXTENDED SELECT s.key FROM srcbucket TABLESAMPLE (BUCKET 1 OUT OF 5 on key) s; Index: ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (revision 739448) +++ ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (working copy) @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -46,142 +47,141 @@ */ public class Partition { - @SuppressWarnings("nls") - static final private Log LOG = LogFactory.getLog("hive.ql.metadata.Partition"); + @SuppressWarnings("nls") + static final private Log LOG = LogFactory.getLog("hive.ql.metadata.Partition"); - private Table table; - private org.apache.hadoop.hive.metastore.api.Partition tPartition; - /** - * @return the tPartition - */ - public org.apache.hadoop.hive.metastore.api.Partition getTPartition() { - return tPartition; - } + private Table table; + private org.apache.hadoop.hive.metastore.api.Partition tPartition; + /** + * @return the tPartition + */ + public org.apache.hadoop.hive.metastore.api.Partition getTPartition() { + return tPartition; + } - private LinkedHashMap spec; - - /** - * @return - * @see org.apache.hadoop.hive.metastore.api.Partition#getValues() - */ - public List getValues() { - return tPartition.getValues(); - } + private LinkedHashMap spec; - private Path partPath; - private URI partURI; + /** + * @return + * @see org.apache.hadoop.hive.metastore.api.Partition#getValues() + */ + public List getValues() { + return tPartition.getValues(); + } - public Partition(Table tbl, org.apache.hadoop.hive.metastore.api.Partition tp) throws HiveException { - initialize(tbl, tp); + private Path partPath; + private URI partURI; + + public Partition(Table tbl, org.apache.hadoop.hive.metastore.api.Partition tp) throws HiveException { + initialize(tbl, tp); + } + + /** + * Create partition object with the given info. + * @param tbl Table the partition will be in. + * @param partSpec Partition specifications. + * @param location Location of the partition, relative to the table. + * @throws HiveException Thrown if we could not create the partition. + */ + public Partition(Table tbl, Map partSpec, + Path location) throws HiveException { + + List pvals = new ArrayList(); + for (FieldSchema field : tbl.getPartCols()) { + pvals.add(partSpec.get(field.getName())); } - /** - * Create partition object with the given info. - * @param tbl Table the partition will be in. - * @param partSpec Partition specifications. - * @param location Location of the partition, relative to the table. - * @throws HiveException Thrown if we could not create the partition. - */ - public Partition(Table tbl, Map partSpec, - Path location) throws HiveException { - - List pvals = new ArrayList(); - for (FieldSchema field : tbl.getPartCols()) { - pvals.add(partSpec.get(field.getName())); - } - - org.apache.hadoop.hive.metastore.api.Partition tpart = - new org.apache.hadoop.hive.metastore.api.Partition(); - tpart.setDbName(tbl.getDbName()); - tpart.setTableName(tbl.getName()); - tpart.setValues(pvals); - - StorageDescriptor sd = new StorageDescriptor(); + org.apache.hadoop.hive.metastore.api.Partition tpart = + new org.apache.hadoop.hive.metastore.api.Partition(); + tpart.setDbName(tbl.getDbName()); + tpart.setTableName(tbl.getName()); + tpart.setValues(pvals); + + StorageDescriptor sd = new StorageDescriptor(); + try { + //replace with THRIFT-138 + TMemoryBuffer buffer = new TMemoryBuffer(1024); + TBinaryProtocol prot = new TBinaryProtocol(buffer); + tbl.getTTable().getSd().write(prot); + + sd.read(prot); + } catch (TException e) { + LOG.error("Could not create a copy of StorageDescription"); + throw new HiveException("Could not create a copy of StorageDescription"); + } + + tpart.setSd(sd); + tpart.getSd().setLocation(location.toString()); + + initialize(tbl, tpart); + } + + /** + * Initializes this object with the given variables + * @param tbl Table the partition belongs to + * @param tp Thrift Partition object + * @throws HiveException Thrown if we cannot initialize the partition + */ + private void initialize(Table tbl, + org.apache.hadoop.hive.metastore.api.Partition tp) + throws HiveException { + + this.table = tbl; + this.tPartition = tp; + this.partName = ""; + + if(tbl.isPartitioned()) { try { - //replace with THRIFT-138 - TMemoryBuffer buffer = new TMemoryBuffer(1024); - TBinaryProtocol prot = new TBinaryProtocol(buffer); - tbl.getTTable().getSd().write(prot); - - sd.read(prot); - } catch (TException e) { - LOG.error("Could not create a copy of StorageDescription"); - throw new HiveException("Could not create a copy of StorageDescription"); - } - - tpart.setSd(sd); - tpart.getSd().setLocation(location.toString()); - - initialize(tbl, tpart); - } - - /** - * Initializes this object with the given variables - * @param tbl Table the partition belongs to - * @param tp Thrift Partition object - * @throws HiveException Thrown if we cannot initialize the partition - */ - private void initialize(Table tbl, - org.apache.hadoop.hive.metastore.api.Partition tp) - throws HiveException { - - this.table = tbl; - this.tPartition = tp; - this.partName = ""; - - if(tbl.isPartitioned()) { - try { - this.partName = Warehouse.makePartName(tbl.getPartCols(), - tp.getValues()); - } catch (MetaException e) { - throw new HiveException("Invalid partition for table " + tbl.getName(), - e); - } - this.partPath = new Path(tp.getSd().getLocation()); - } else { - // We are in the HACK territory. - // SemanticAnalyzer expects a single partition whose schema - // is same as the table partition. - this.partPath = table.getPath(); + this.partName = Warehouse.makePartName(tbl.getPartCols(), + tp.getValues()); + } catch (MetaException e) { + throw new HiveException("Invalid partition for table " + tbl.getName(), + e); } - - this.spec = new LinkedHashMap(tbl.createSpec(tp)); - this.partURI = partPath.toUri(); + this.partPath = new Path(tp.getSd().getLocation()); + } else { + // We are in the HACK territory. + // SemanticAnalyzer expects a single partition whose schema + // is same as the table partition. + this.partPath = table.getPath(); } - - - public String getName() { - return partName; - } - public Table getTable() { - return (this.table); - } + this.spec = new LinkedHashMap(tbl.createSpec(tp)); + this.partURI = partPath.toUri(); + } - public Path [] getPath() { - Path [] ret = new Path [1]; - ret[0] = this.partPath; - return(ret); - } + public String getName() { + return partName; + } - public Path getPartitionPath() { - return this.partPath; - } + public Table getTable() { + return (this.table); + } - final public URI getDataLocation() { - return this.partURI; - } + public Path [] getPath() { + Path [] ret = new Path [1]; + ret[0] = this.partPath; + return(ret); + } - /** - * The number of buckets is a property of the partition. However - internally we are just - * storing it as a property of the table as a short term measure. - */ - public int getBucketCount() { - return this.table.getNumBuckets(); - /* + public Path getPartitionPath() { + return this.partPath; + } + + final public URI getDataLocation() { + return this.partURI; + } + + /** + * The number of buckets is a property of the partition. However - internally we are just + * storing it as a property of the table as a short term measure. + */ + public int getBucketCount() { + return this.table.getNumBuckets(); + /* TODO: Keeping this code around for later use when we will support sampling on tables which are not created with CLUSTERED INTO clause - + // read from table meta data int numBuckets = this.table.getNumBuckets(); if (numBuckets == -1) { @@ -198,147 +198,148 @@ } } return numBuckets; - */ + */ + } + + public List getBucketCols() { + return this.table.getBucketCols(); + } + + /** + * mapping from bucket number to bucket path + */ + //TODO: add test case and clean it up + @SuppressWarnings("nls") + public Path getBucketPath(int bucketNum) { + try { + FileSystem fs = FileSystem.get(this.table.getDataLocation(), Hive.get().getConf()); + String pathPattern = this.partPath.toString(); + if (getBucketCount() > 0) { + pathPattern = pathPattern + "/*"; + } + LOG.info("Path pattern = " + pathPattern); + FileStatus srcs[] = fs.globStatus(new Path(pathPattern)); + Arrays.sort(srcs); + for (FileStatus src: srcs) { + LOG.info("Got file: " + src.getPath()); + } + return srcs[bucketNum].getPath(); } - - public List getBucketCols() { - return this.table.getBucketCols(); + catch (Exception e) { + throw new RuntimeException("Cannot get bucket path for bucket " + bucketNum, e); } + // return new Path(this.partPath, String.format("part-%1$05d", bucketNum)); + } - /** - * mapping from bucket number to bucket path - */ - //TODO: add test case and clean it up - @SuppressWarnings("nls") - public Path getBucketPath(int bucketNum) { - try { - FileSystem fs = FileSystem.get(this.table.getDataLocation(), Hive.get().getConf()); - String pathPattern = this.partPath.toString(); - if (getBucketCount() > 0) { - pathPattern = pathPattern + "/*"; - } - LOG.info("Path pattern = " + pathPattern); - FileStatus srcs[] = fs.globStatus(new Path(pathPattern)); - for (FileStatus src: srcs) { - LOG.info("Got file: " + src.getPath()); - } - return srcs[bucketNum].getPath(); - } - catch (Exception e) { - throw new RuntimeException("Cannot get bucket path for bucket " + bucketNum, e); - } - // return new Path(this.partPath, String.format("part-%1$05d", bucketNum)); - } + /** + * mapping from a Path to the bucket number if any + */ + private static Pattern bpattern = Pattern.compile("part-([0-9][0-9][0-9][0-9][0-9])"); - /** - * mapping from a Path to the bucket number if any - */ - private static Pattern bpattern = Pattern.compile("part-([0-9][0-9][0-9][0-9][0-9])"); - - private String partName; - @SuppressWarnings("nls") - public static int getBucketNum(Path p) { - Matcher m = bpattern.matcher(p.getName()); - if(m.find()) { - String bnum_str = m.group(1); - try { - return (Integer.parseInt(bnum_str)); - } catch (NumberFormatException e) { - throw new RuntimeException("Unexpected error parsing: "+p.getName()+","+bnum_str); - } - } - return 0; + private String partName; + @SuppressWarnings("nls") + public static int getBucketNum(Path p) { + Matcher m = bpattern.matcher(p.getName()); + if(m.find()) { + String bnum_str = m.group(1); + try { + return (Integer.parseInt(bnum_str)); + } catch (NumberFormatException e) { + throw new RuntimeException("Unexpected error parsing: "+p.getName()+","+bnum_str); + } } + return 0; + } - @SuppressWarnings("nls") - public Path [] getPath(Sample s) throws HiveException { - if(s == null) { - return getPath(); - } else { - int bcount = this.getBucketCount(); - if(bcount == 0) { - return getPath(); - } + @SuppressWarnings("nls") + public Path [] getPath(Sample s) throws HiveException { + if(s == null) { + return getPath(); + } else { + int bcount = this.getBucketCount(); + if(bcount == 0) { + return getPath(); + } - Dimension d = s.getSampleDimension(); - if(!d.getDimensionId().equals(this.table.getBucketingDimensionId())) { - // if the bucket dimension is not the same as the sampling dimension - // we must scan all the data - return getPath(); - } + Dimension d = s.getSampleDimension(); + if(!d.getDimensionId().equals(this.table.getBucketingDimensionId())) { + // if the bucket dimension is not the same as the sampling dimension + // we must scan all the data + return getPath(); + } - int scount = s.getSampleFraction(); - ArrayList ret = new ArrayList (); + int scount = s.getSampleFraction(); + ArrayList ret = new ArrayList (); - if(bcount == scount) { - ret.add(getBucketPath(s.getSampleNum()-1)); - } else if (bcount < scount) { - if((scount/bcount)*bcount != scount) { - throw new HiveException("Sample Count"+scount+" is not a multiple of bucket count " + - bcount + " for table " + this.table.getName()); - } - // undersampling a bucket - ret.add(getBucketPath((s.getSampleNum()-1)%bcount)); - } else if (bcount > scount) { - if((bcount/scount)*scount != bcount) { - throw new HiveException("Sample Count"+scount+" is not a divisor of bucket count " + - bcount + " for table " + this.table.getName()); - } - // sampling multiple buckets - for(int i=0; i scount) { + if((bcount/scount)*scount != bcount) { + throw new HiveException("Sample Count"+scount+" is not a divisor of bucket count " + + bcount + " for table " + this.table.getName()); + } + // sampling multiple buckets + for(int i=0; i getSpec() { - return this.spec; - } + public LinkedHashMap getSpec() { + return this.spec; + } - /** - * Replaces files in the partition with new data set specified by srcf. Works by moving files - * - * @param srcf Files to be moved. Leaf Directories or Globbed File Paths - */ - @SuppressWarnings("nls") - protected void replaceFiles(Path srcf) throws HiveException { - FileSystem fs; - try { - fs = FileSystem.get(table.getDataLocation(), Hive.get().getConf()); - Hive.get().replaceFiles(srcf, partPath, fs); - } catch (IOException e) { - throw new HiveException("addFiles: filesystem error in check phase", e); - } + /** + * Replaces files in the partition with new data set specified by srcf. Works by moving files + * + * @param srcf Files to be moved. Leaf Directories or Globbed File Paths + */ + @SuppressWarnings("nls") + protected void replaceFiles(Path srcf) throws HiveException { + FileSystem fs; + try { + fs = FileSystem.get(table.getDataLocation(), Hive.get().getConf()); + Hive.get().replaceFiles(srcf, partPath, fs); + } catch (IOException e) { + throw new HiveException("addFiles: filesystem error in check phase", e); } + } - /** - * Inserts files specified into the partition. Works by moving files - * - * @param srcf Files to be moved. Leaf Directories or Globbed File Paths - */ - @SuppressWarnings("nls") - protected void copyFiles(Path srcf) throws HiveException { - FileSystem fs; - try { - fs = FileSystem.get(table.getDataLocation(), Hive.get().getConf()); - Hive.get().copyFiles(srcf, partPath, fs); - } catch (IOException e) { - throw new HiveException("addFiles: filesystem error in check phase", e); - } + /** + * Inserts files specified into the partition. Works by moving files + * + * @param srcf Files to be moved. Leaf Directories or Globbed File Paths + */ + @SuppressWarnings("nls") + protected void copyFiles(Path srcf) throws HiveException { + FileSystem fs; + try { + fs = FileSystem.get(table.getDataLocation(), Hive.get().getConf()); + Hive.get().copyFiles(srcf, partPath, fs); + } catch (IOException e) { + throw new HiveException("addFiles: filesystem error in check phase", e); } + } - @SuppressWarnings("nls") - @Override - public String toString() { - String pn = "Invalid Partition"; - try { - pn = Warehouse.makePartName(spec); - } catch (MetaException e) { - // ignore as we most probably in an exception path already otherwise this error wouldn't occur - } - return table.toString() + "(" + pn + ")"; + @SuppressWarnings("nls") + @Override + public String toString() { + String pn = "Invalid Partition"; + try { + pn = Warehouse.makePartName(spec); + } catch (MetaException e) { + // ignore as we most probably in an exception path already otherwise this error wouldn't occur } + return table.toString() + "(" + pn + ")"; + } }