diff --git hcatalog/streaming/pom.xml hcatalog/streaming/pom.xml index 6d03ce1..ba9f731 100644 --- hcatalog/streaming/pom.xml +++ hcatalog/streaming/pom.xml @@ -104,6 +104,13 @@ test + + org.apache.hadoop + hadoop-mapreduce-client-common + test + ${hadoop-23.version} + + diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java index c959222..404b360 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java @@ -22,9 +22,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Table; @@ -32,13 +32,17 @@ import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.thrift.TException; import java.io.IOException; -import java.util.Random; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + abstract class AbstractRecordWriter implements RecordWriter { static final private Log LOG = LogFactory.getLog(AbstractRecordWriter.class.getName()); @@ -47,12 +51,12 @@ final HiveEndPoint endPoint; final Table tbl; - final IMetaStoreClient msClient; - RecordUpdater updater = null; + final HiveMetaStoreClient msClient; + protected final List bucketIds; + ArrayList updaters = null; + + public final int totalBuckets; - private final int totalBuckets; - private Random rand = new Random(); - private int currentBucketId = 0; private final Path partitionPath; final AcidOutputFormat outf; @@ -63,16 +67,17 @@ protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf) this.conf = conf!=null ? conf : HiveEndPoint.createHiveConf(DelimitedInputWriter.class, endPoint.metaStoreUri); try { - msClient = HCatUtil.getHiveMetastoreClient(this.conf); + msClient = new HiveMetaStoreClient(this.conf); this.tbl = msClient.getTable(endPoint.database, endPoint.table); this.partitionPath = getPathForEndPoint(msClient, endPoint); this.totalBuckets = tbl.getSd().getNumBuckets(); + this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()) ; if(totalBuckets <= 0) { throw new StreamingException("Cannot stream to table that has not been bucketed : " + endPoint); } String outFormatName = this.tbl.getSd().getOutputFormat(); - outf = (AcidOutputFormat) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf); + outf = (AcidOutputFormat) ReflectionUtils.newInstance(Class.forName(outFormatName), conf); } catch (MetaException e) { throw new ConnectionError(endPoint, e); } catch (NoSuchObjectException e) { @@ -81,11 +86,21 @@ protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf) throw new StreamingException(e.getMessage(), e); } catch (ClassNotFoundException e) { throw new StreamingException(e.getMessage(), e); - } catch (IOException e) { - throw new StreamingException(e.getMessage(), e); } } + // return the column numbers of the bucketed columns + private List getBucketColIDs(List bucketCols, List cols) { + ArrayList result = new ArrayList(bucketCols.size()); + HashSet bucketSet = new HashSet(bucketCols); + for (int i = 0; i < cols.size(); i++) { + if( bucketSet.contains(cols.get(i).getName()) ) { + result.add(i); + } + } + return result; + } + protected AbstractRecordWriter(HiveEndPoint endPoint) throws ConnectionError, StreamingException { this(endPoint, HiveEndPoint.createHiveConf(AbstractRecordWriter.class, endPoint.metaStoreUri) ); @@ -93,10 +108,23 @@ protected AbstractRecordWriter(HiveEndPoint endPoint) abstract SerDe getSerde() throws SerializationError; + protected abstract ObjectInspector[] getBucketObjectInspectors(); + + protected abstract Object[] getBucketFields(Object row) throws SerializationError; + + // returns the bucket number to which the record belongs to + protected int getBucket(Object row) throws SerializationError { + ObjectInspector[] inspectors = getBucketObjectInspectors(); + Object[] bucketFields = getBucketFields(row); + return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets); + } + @Override public void flush() throws StreamingIOFailure { try { - updater.flush(); + for (RecordUpdater updater : updaters) { + updater.flush(); + } } catch (IOException e) { throw new StreamingIOFailure("Unable to flush recordUpdater", e); } @@ -116,9 +144,8 @@ public void clear() throws StreamingIOFailure { public void newBatch(Long minTxnId, Long maxTxnID) throws StreamingIOFailure, SerializationError { try { - this.currentBucketId = rand.nextInt(totalBuckets); LOG.debug("Creating Record updater"); - updater = createRecordUpdater(currentBucketId, minTxnId, maxTxnID); + updaters = createRecordUpdaters(totalBuckets, minTxnId, maxTxnID); } catch (IOException e) { LOG.error("Failed creating record updater", e); throw new StreamingIOFailure("Unable to get new record Updater", e); @@ -128,13 +155,24 @@ public void newBatch(Long minTxnId, Long maxTxnID) @Override public void closeBatch() throws StreamingIOFailure { try { - updater.close(false); - updater = null; + for (RecordUpdater updater : updaters) { + updater.close(false); + } + updaters.clear(); } catch (IOException e) { throw new StreamingIOFailure("Unable to close recordUpdater", e); } } + private ArrayList createRecordUpdaters(int bucketCount, Long minTxnId, Long maxTxnID) + throws IOException, SerializationError { + ArrayList result = new ArrayList(bucketCount); + for (int bucket = 0; bucket < bucketCount; bucket++) { + result.add(createRecordUpdater(bucket, minTxnId, maxTxnID) ); + } + return result; + } + private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID) throws IOException, SerializationError { try { @@ -143,16 +181,14 @@ private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxT .inspector(getSerde().getObjectInspector()) .bucket(bucketId) .minimumTransactionId(minTxnId) - .maximumTransactionId(maxTxnID) - .statementId(-1) - .finalDestination(partitionPath)); + .maximumTransactionId(maxTxnID)); } catch (SerDeException e) { throw new SerializationError("Failed to get object inspector from Serde " + getSerde().getClass().getName(), e); } } - private Path getPathForEndPoint(IMetaStoreClient msClient, HiveEndPoint endPoint) + private Path getPathForEndPoint(HiveMetaStoreClient msClient, HiveEndPoint endPoint) throws StreamingException { try { String location; diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java index 6dc69f0..71fdad9 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java @@ -26,12 +26,14 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.io.BytesWritable; import java.io.IOException; @@ -51,7 +53,13 @@ private char serdeSeparator; private int[] fieldToColMapping; private final ArrayList tableColumns; - private AbstractSerDe serde = null; + private LazySimpleSerDe serde = null; + + private final LazySimpleStructObjectInspector recordObjInspector; + private final ObjectInspector[] bucketObjInspectors; + private final StructField[] bucketStructFields; + private Object[] bucketFieldData; // Preallocating in constructor. Updated on each write. + static final private Log LOG = LogFactory.getLog(DelimitedInputWriter.class.getName()); @@ -120,6 +128,24 @@ public DelimitedInputWriter(String[] colNamesForFields, String delimiter, this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns()); LOG.debug("Field reordering needed = " + this.reorderingNeeded + ", for endpoint " + endPoint); this.serdeSeparator = serdeSeparator; + this.serde = createSerde(tbl, conf, serdeSeparator); + + // get ObjInspectors for entire record and bucketed cols + try { + recordObjInspector = (LazySimpleStructObjectInspector) serde.getObjectInspector(); + this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector); + } catch (SerDeException e) { + throw new SerializationError("Unable to get ObjectInspector for bucket columns", e); + } + + // get StructFields for bucketed cols + bucketStructFields = new StructField[bucketIds.size()]; + List allFields = recordObjInspector.getAllStructFieldRefs(); + for (int i = 0; i < bucketIds.size(); i++) { + bucketStructFields[i] = allFields.get(bucketIds.get(i)); + } + bucketFieldData = new Object[bucketIds.size()]; + } private boolean isReorderingNeeded(String delimiter, ArrayList tableColumns) { @@ -173,20 +199,20 @@ private static boolean areFieldsInColOrder(int[] fieldToColMapping) { } String[] reorderedFields = new String[getTableColumns().size()]; String decoded = new String(record); - String[] fields = decoded.split(delimiter); + String[] fields = decoded.split(delimiter,-1); for (int i=0; i bucketIds + , LazySimpleStructObjectInspector recordObjInspector) + throws SerializationError { + ObjectInspector[] result = new ObjectInspector[bucketIds.size()]; + + for (int i = 0; i < bucketIds.size(); i++) { + int bucketId = bucketIds.get(i); + result[i] = + recordObjInspector.getAllStructFieldRefs().get( bucketId ).getFieldObjectInspector(); + } + return result; + } + + @Override + protected Object[] getBucketFields(Object row) throws SerializationError { + for (int i = 0; i < bucketIds.size(); i++) { + bucketFieldData[i] = recordObjInspector.getStructFieldData(row, bucketStructFields[i]); + } + return bucketFieldData; + } + + private Object encode(byte[] record) throws SerializationError { try { BytesWritable blob = new BytesWritable(); @@ -244,7 +293,7 @@ private Object encode(byte[] record) throws SerializationError { * @throws SerializationError if serde could not be initialized * @param tbl */ - protected LazySimpleSerDe createSerde(Table tbl, HiveConf conf) + protected static LazySimpleSerDe createSerde(Table tbl, HiveConf conf, char serdeSeparator) throws SerializationError { try { Properties tableProps = MetaStoreUtils.getTableMetadata(tbl); diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java index 6d6beb8..a27a7b6 100644 --- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java +++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java @@ -24,10 +24,14 @@ import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.io.Text; +import org.apache.hive.hcatalog.data.HCatRecordObjectInspector; import org.apache.hive.hcatalog.data.JsonSerDe; import java.io.IOException; +import java.util.List; import java.util.Properties; /** @@ -37,6 +41,11 @@ public class StrictJsonWriter extends AbstractRecordWriter { private JsonSerDe serde; + private final HCatRecordObjectInspector recordObjInspector; + private final ObjectInspector[] bucketObjInspectors; + private final StructField[] bucketStructFields; + private Object[] bucketFieldData; // Preallocating in constructor. Updated on each write. + /** * * @param endPoint the end point to write to @@ -47,6 +56,23 @@ public StrictJsonWriter(HiveEndPoint endPoint) throws ConnectionError, SerializationError, StreamingException { super(endPoint, null); + this.serde = createSerde(tbl, conf); + + // get ObjInspectors for entire record and bucketed cols + try { + recordObjInspector = ( HCatRecordObjectInspector ) serde.getObjectInspector(); + this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector); + } catch (SerDeException e) { + throw new SerializationError("Unable to get ObjectInspector for bucket columns", e); + } + + // get StructFields for bucketed cols + bucketStructFields = new StructField[bucketIds.size()]; + List allFields = recordObjInspector.getAllStructFieldRefs(); + for (int i = 0; i < bucketIds.size(); i++) { + bucketStructFields[i] = allFields.get(bucketIds.get(i)); + } + bucketFieldData = new Object[bucketIds.size()]; } /** @@ -60,23 +86,59 @@ public StrictJsonWriter(HiveEndPoint endPoint) public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf) throws ConnectionError, SerializationError, StreamingException { super(endPoint, conf); + + // get ObjInspectors for entire record and bucketed cols + try { + recordObjInspector = ( HCatRecordObjectInspector ) serde.getObjectInspector(); + this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector); + } catch (SerDeException e) { + throw new SerializationError("Unable to get ObjectInspector for bucket columns", e); + } + + // get StructFields for bucketed cols + bucketStructFields = new StructField[bucketIds.size()]; + List allFields = recordObjInspector.getAllStructFieldRefs(); + for (int i = 0; i < bucketIds.size(); i++) { + bucketStructFields[i] = allFields.get(bucketIds.get(i)); + } + bucketFieldData = new Object[bucketIds.size()]; } @Override SerDe getSerde() throws SerializationError { - if(serde!=null) { - return serde; - } - serde = createSerde(tbl, conf); return serde; } + protected ObjectInspector[] getBucketObjectInspectors() { + return bucketObjInspectors; + } + + @Override + protected Object[] getBucketFields(Object row) throws SerializationError { + for (int i = 0; i < bucketIds.size(); i++) { + bucketFieldData[i] = recordObjInspector.getStructFieldData(row, bucketStructFields[i]); + } + return bucketFieldData; + } + + private static ObjectInspector[] getObjectInspectorsForBucketedCols(List bucketIds + , HCatRecordObjectInspector recordObjInspector) + throws SerializationError { + ObjectInspector[] result = new ObjectInspector[bucketIds.size()]; + for (int i = 0; i < bucketIds.size(); i++) { + int bucketId = bucketIds.get(i); + result[i] = recordObjInspector.getAllStructFieldRefs().get( bucketId ).getFieldObjectInspector(); + } + return result; + } + @Override public void write(long transactionId, byte[] record) throws StreamingIOFailure, SerializationError { try { Object encodedRow = encode(record); - updater.insert(transactionId, encodedRow); + int bucket = getBucket(encodedRow); + updaters.get(bucket).insert(transactionId, encodedRow); } catch (IOException e) { throw new StreamingIOFailure("Error writing record in transaction(" + transactionId + ")", e); diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index c28d4aa..720ba12 100644 --- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -18,33 +18,38 @@ package org.apache.hive.hcatalog.streaming; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; -import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.ql.io.orc.RecordReader; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; @@ -68,7 +73,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.TimeUnit; @@ -118,6 +122,7 @@ public FileStatus getFileStatus(Path path) throws IOException { private static final String COL2 = "msg"; private final HiveConf conf; + private Driver driver; private final IMetaStoreClient msClient; final String metaStoreURI = null; @@ -127,13 +132,23 @@ public FileStatus getFileStatus(Path path) throws IOException { private final static String tblName = "alerts"; private final static String[] fieldNames = new String[]{COL1,COL2}; List partitionVals; - private static String partLocation; + private static Path partLoc; + private static Path partLoc2; // unpartitioned table - private final static String dbName2 = "testing"; + private final static String dbName2 = "testing2"; private final static String tblName2 = "alerts"; private final static String[] fieldNames2 = new String[]{COL1,COL2}; + + // for bucket join testing + private final static String dbName3 = "testing3"; + private final static String tblName3 = "dimensionTable"; + private final static String dbName4 = "testing4"; + private final static String tblName4 = "factTable"; + List partitionVals2; + + private final String PART1_CONTINENT = "Asia"; private final String PART1_COUNTRY = "India"; @@ -146,14 +161,21 @@ public TestStreaming() throws Exception { partitionVals.add(PART1_CONTINENT); partitionVals.add(PART1_COUNTRY); + partitionVals2 = new ArrayList(1); + partitionVals2.add(PART1_COUNTRY); + + conf = new HiveConf(this.getClass()); conf.set("fs.raw.impl", RawFileSystem.class.getName()); + conf.set("hive.enforce.bucketing", "true"); TxnDbUtil.setConfValues(conf); if (metaStoreURI!=null) { conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI); } conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true); conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true); + dbFolder.create(); + //1) Start from a clean slate (metastore) TxnDbUtil.cleanDb(); @@ -165,17 +187,37 @@ public TestStreaming() throws Exception { @Before public void setup() throws Exception { + SessionState.start(new CliSessionState(conf)); + driver = new Driver(conf); + driver.setMaxRows(200002);//make sure Driver returns all results // drop and recreate the necessary databases and tables dropDB(msClient, dbName); - createDbAndTable(msClient, dbName, tblName, partitionVals); + + String[] colNames = new String[] {COL1, COL2}; + String[] colTypes = new String[] {serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME}; + String[] bucketCols = new String[] {COL1}; +// partLoc = createDbAndTable(msClient, dbName, tblName, partitionVals); + String loc1 = dbFolder.newFolder(dbName + ".db").toString(); + String[] partNames = new String[]{"Continent", "Country"}; + partLoc = createDbAndTable(driver, dbName, tblName, partitionVals, colNames, colTypes, bucketCols, partNames, loc1, 1); dropDB(msClient, dbName2); - createDbAndTable(msClient, dbName2, tblName2, partitionVals); +// partLoc2 = createDbAndTable(msClient, dbName2, tblName2, partitionVals); + String loc2 = dbFolder.newFolder(dbName2 + ".db").toString(); + partLoc2 = createDbAndTable(driver, dbName2, tblName2, partitionVals, colNames, colTypes, bucketCols, partNames, loc2, 2); + + String loc3 = dbFolder.newFolder("testing5.db").toString(); + createStoreSales("testing5", loc3); + + runDDL(driver, "drop table testBucketing3.streamedtable"); + runDDL(driver, "drop table testBucketing3.finaltable"); + runDDL(driver, "drop table testBucketing3.nobucket"); } @After public void cleanup() throws Exception { msClient.close(); + driver.close(); } private static List getPartitionKeys() { @@ -186,10 +228,174 @@ public void cleanup() throws Exception { return fields; } - private void checkDataWritten(long minTxn, long maxTxn, int buckets, int numExpectedFiles, + private void createStoreSales(String dbName, String loc) throws Exception { + String dbUri = "raw://" + new Path(loc).toUri().toString(); + String tableLoc = dbUri + Path.SEPARATOR + "store_sales"; + + boolean success = runDDL(driver, "create database IF NOT EXISTS " + dbName + " location '" + dbUri + "'"); + Assert.assertTrue(success); + success = runDDL(driver, "use " + dbName); + Assert.assertTrue(success); + + success = runDDL(driver, "drop table if exists store_sales"); + Assert.assertTrue(success); + success = runDDL(driver, "create table store_sales\n" + + "(\n" + + " ss_sold_date_sk int,\n" + + " ss_sold_time_sk int,\n" + + " ss_item_sk int,\n" + + " ss_customer_sk int,\n" + + " ss_cdemo_sk int,\n" + + " ss_hdemo_sk int,\n" + + " ss_addr_sk int,\n" + + " ss_store_sk int,\n" + + " ss_promo_sk int,\n" + + " ss_ticket_number int,\n" + + " ss_quantity int,\n" + + " ss_wholesale_cost decimal(7,2),\n" + + " ss_list_price decimal(7,2),\n" + + " ss_sales_price decimal(7,2),\n" + + " ss_ext_discount_amt decimal(7,2),\n" + + " ss_ext_sales_price decimal(7,2),\n" + + " ss_ext_wholesale_cost decimal(7,2),\n" + + " ss_ext_list_price decimal(7,2),\n" + + " ss_ext_tax decimal(7,2),\n" + + " ss_coupon_amt decimal(7,2),\n" + + " ss_net_paid decimal(7,2),\n" + + " ss_net_paid_inc_tax decimal(7,2),\n" + + " ss_net_profit decimal(7,2)\n" + + ")\n" + + " partitioned by (dt string)\n" + + "clustered by (ss_store_sk, ss_promo_sk)\n" + + "INTO 4 BUCKETS stored as orc " + " location '" + tableLoc + "'" + " TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')"); + Assert.assertTrue(success); + + success = runDDL(driver, "alter table store_sales add partition(dt='2015')"); + Assert.assertTrue(success); + } + /** + * make sure it works with table where bucket col is not 1st col + * @throws Exception + */ + @Test + public void testBucketingWhereBucketColIsNotFirstCol() throws Exception { + List partitionVals = new ArrayList(); + partitionVals.add("2015"); + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testing5", "store_sales", partitionVals); + DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk", + "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity", + "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost", + "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt); + StreamingConnection connection = endPt.newConnection(false, null);//should this really be null? + + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + + StringBuilder row = new StringBuilder(); + for(int i = 0; i < 10; i++) { + for(int ints = 0; ints < 11; ints++) { + row.append(ints).append(','); + } + for(int decs = 0; decs < 12; decs++) { + row.append(i + 0.1).append(','); + } + row.setLength(row.length() - 1); + txnBatch.write(row.toString().getBytes()); + } + txnBatch.commit(); + txnBatch.close(); + connection.close(); + + driver.run("set hive.execution.engine=tez"); + ArrayList res = queryTable(driver, "select row__id.bucketid, * from testing5.store_sales"); + for (String re : res) { + System.out.println(re); + } + } + + + // stream data into streaming table with N buckets, then copy the data into another bucketed table + // check if bucketing in both was done in the same way + @Test + public void testStreamBucketingMatchesRegularBucketing() throws Exception { + int bucketCount = 100; + + String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString(); + String tableLoc = "'" + dbUri + Path.SEPARATOR + "streamedtable" + "'"; + String tableLoc2 = "'" + dbUri + Path.SEPARATOR + "finaltable" + "'"; + String tableLoc3 = "'" + dbUri + Path.SEPARATOR + "nobucket" + "'"; + + runDDL(driver, "create database testBucketing3"); + runDDL(driver, "use testBucketing3"); + runDDL(driver, "set hive.enforce.bucketing = true"); + runDDL(driver, "create table streamedtable ( key1 string,key2 int,data string ) clustered by ( key1,key2 ) into " + + bucketCount + " buckets stored as orc location " + tableLoc + " TBLPROPERTIES ('transactional'='true')") ; +// In 'nobucket' table we capture bucketid from streamedtable to workaround a hive bug that prevents joins two identically bucketed tables + runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 int,data string ) location " + tableLoc3) ; + runDDL(driver, "create table finaltable ( bucketid int, key1 string,key2 int,data string ) clustered by ( key1,key2 ) into " + + bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='true')"); + + + String[] records = new String[] { + "PSFAHYLZVC,29,EPNMA", + "PPPRKWAYAU,96,VUTEE", + "MIAOFERCHI,3,WBDSI", + "CEGQAZOWVN,0,WCUZL", + "XWAKMNSVQF,28,YJVHU", + "XBWTSAJWME,2,KDQFO", + "FUVLQTAXAY,5,LDSDG", + "QTQMDJMGJH,6,QBOMA", + "EFLOTLWJWN,71,GHWPS", + "PEQNAOJHCM,82,CAAFI", + "MOEKQLGZCP,41,RUACR", + "QZXMCOPTID,37,LFLWE", + "EYALVWICRD,13,JEZLC", + "VYWLZAYTXX,16,DMVZX", + "OSALYSQIXR,47,HNZVE", + "JGKVHKCEGQ,25,KSCJB", + "WQFMMYDHET,12,DTRWA", + "AJOVAYZKZQ,15,YBKFO", + "YAQONWCUAU,31,QJNHZ", + "DJBXUEUOEB,35,IYCBL" + }; + + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "streamedtable", null); + String[] colNames1 = new String[] { "key1", "key2", "data" }; + DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt); + StreamingConnection connection = endPt.newConnection(false); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, wr); + txnBatch.beginNextTransaction(); + + for (String record : records) { + txnBatch.write(record.toString().getBytes()); + } + + txnBatch.commit(); + txnBatch.close(); + connection.close(); + + driver.run("set hive.execution.engine=tez"); + ArrayList res1 = queryTable(driver, "select row__id.bucketid, * from streamedtable order by key2"); + for (String re : res1) { + System.out.println(re); + } + + driver.run("insert into nobucket select row__id.bucketid,* from streamedtable"); + runDDL(driver, "set hive.enforce.bucketing = true"); + runDDL(driver, " insert into finaltable select * from nobucket"); + ArrayList res2 = queryTable(driver, "select row__id.bucketid,* from finaltable where row__id.bucketid<>bucketid"); + for (String s : res2) { + LOG.error(s); + } + Assert.assertTrue(res2.isEmpty()); + } + + + private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles, String... records) throws Exception { ValidTxnList txns = msClient.getValidTxns(); - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns); + AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List current = dir.getCurrentDirectories(); @@ -197,7 +403,7 @@ private void checkDataWritten(long minTxn, long maxTxn, int buckets, int numExpe for (AcidUtils.ParsedDelta pd : current) System.out.println(pd.getPath().toString()); Assert.assertEquals(numExpectedFiles, current.size()); - // find the absolute mininum transaction + // find the absolute minimum transaction long min = Long.MAX_VALUE; long max = Long.MIN_VALUE; for (AcidUtils.ParsedDelta pd : current) { @@ -209,11 +415,11 @@ private void checkDataWritten(long minTxn, long maxTxn, int buckets, int numExpe InputFormat inf = new OrcInputFormat(); JobConf job = new JobConf(); - job.set("mapred.input.dir", partLocation.toString()); + job.set("mapred.input.dir", partitionPath.toString()); job.set("bucket_count", Integer.toString(buckets)); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); - InputSplit[] splits = inf.getSplits(job, 1); - Assert.assertEquals(1, splits.length); + InputSplit[] splits = inf.getSplits(job, buckets); + Assert.assertEquals(buckets, splits.length); org.apache.hadoop.mapred.RecordReader rr = inf.getRecordReader(splits[0], job, Reporter.NULL); @@ -226,9 +432,9 @@ private void checkDataWritten(long minTxn, long maxTxn, int buckets, int numExpe Assert.assertEquals(false, rr.next(key, value)); } - private void checkNothingWritten() throws Exception { + private void checkNothingWritten(Path partitionPath) throws Exception { ValidTxnList txns = msClient.getValidTxns(); - AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns); + AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns); Assert.assertEquals(0, dir.getObsolete().size()); Assert.assertEquals(0, dir.getOriginalFiles().size()); List current = dir.getCurrentDirectories(); @@ -398,7 +604,7 @@ public void testTransactionBatchCommit_Delimited() throws Exception { txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); - checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); @@ -410,11 +616,11 @@ public void testTransactionBatchCommit_Delimited() throws Exception { txnBatch.write("2,Welcome to streaming".getBytes()); // data should not be visible - checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); txnBatch.commit(); - checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}", + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", "{2, Welcome to streaming}"); txnBatch.close(); @@ -459,7 +665,7 @@ public void testTransactionBatchCommit_Json() throws Exception { txnBatch.write(rec1.getBytes()); txnBatch.commit(); - checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); @@ -518,7 +724,7 @@ public void testRemainingTransactions() throws Exception { , txnBatch.getCurrentTransactionState()); ++batch; } - Assert.assertEquals(0,txnBatch.remainingTransactions()); + Assert.assertEquals(0, txnBatch.remainingTransactions()); txnBatch.close(); Assert.assertEquals(TransactionBatch.TxnState.INACTIVE @@ -542,7 +748,7 @@ public void testTransactionBatchAbort() throws Exception { txnBatch.write("2,Welcome to streaming".getBytes()); txnBatch.abort(); - checkNothingWritten(); + checkNothingWritten(partLoc); Assert.assertEquals(TransactionBatch.TxnState.ABORTED , txnBatch.getCurrentTransactionState()); @@ -550,7 +756,7 @@ public void testTransactionBatchAbort() throws Exception { txnBatch.close(); connection.close(); - checkNothingWritten(); + checkNothingWritten(partLoc); } @@ -569,7 +775,7 @@ public void testTransactionBatchAbortAndCommit() throws Exception { txnBatch.write("2,Welcome to streaming".getBytes()); txnBatch.abort(); - checkNothingWritten(); + checkNothingWritten(partLoc); Assert.assertEquals(TransactionBatch.TxnState.ABORTED , txnBatch.getCurrentTransactionState()); @@ -579,8 +785,8 @@ public void testTransactionBatchAbortAndCommit() throws Exception { txnBatch.write("2,Welcome to streaming".getBytes()); txnBatch.commit(); - checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}", - "{2, Welcome to streaming}"); + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", + "{2, Welcome to streaming}"); txnBatch.close(); connection.close(); @@ -598,14 +804,14 @@ public void testMultipleTransactionBatchCommits() throws Exception { txnBatch.write("1,Hello streaming".getBytes()); txnBatch.commit(); - checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}"); + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}"); txnBatch.beginNextTransaction(); txnBatch.write("2,Welcome to streaming".getBytes()); txnBatch.commit(); - checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}", - "{2, Welcome to streaming}"); + checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}", + "{2, Welcome to streaming}"); txnBatch.close(); @@ -615,16 +821,16 @@ public void testMultipleTransactionBatchCommits() throws Exception { txnBatch.write("3,Hello streaming - once again".getBytes()); txnBatch.commit(); - checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", - "{2, Welcome to streaming}", "{3, Hello streaming - once again}"); + checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", + "{2, Welcome to streaming}", "{3, Hello streaming - once again}"); txnBatch.beginNextTransaction(); txnBatch.write("4,Welcome to streaming - once again".getBytes()); txnBatch.commit(); - checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", - "{2, Welcome to streaming}", "{3, Hello streaming - once again}", - "{4, Welcome to streaming - once again}"); + checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", + "{2, Welcome to streaming}", "{3, Hello streaming - once again}", + "{4, Welcome to streaming - once again}"); Assert.assertEquals(TransactionBatch.TxnState.COMMITTED , txnBatch.getCurrentTransactionState()); @@ -655,15 +861,15 @@ public void testInterleavedTransactionBatchCommits() throws Exception { txnBatch1.write("1,Hello streaming".getBytes()); txnBatch2.write("3,Hello streaming - once again".getBytes()); - checkNothingWritten(); + checkNothingWritten(partLoc); txnBatch2.commit(); - checkDataWritten(11, 20, 1, 1, "{3, Hello streaming - once again}"); + checkDataWritten(partLoc, 11, 20, 1, 1, "{3, Hello streaming - once again}"); txnBatch1.commit(); - checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); + checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); txnBatch1.beginNextTransaction(); txnBatch1.write("2,Welcome to streaming".getBytes()); @@ -671,17 +877,17 @@ public void testInterleavedTransactionBatchCommits() throws Exception { txnBatch2.beginNextTransaction(); txnBatch2.write("4,Welcome to streaming - once again".getBytes()); - checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); + checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}"); txnBatch1.commit(); - checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", + checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}"); txnBatch2.commit(); - checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", + checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{2, Welcome to streaming}", "{3, Hello streaming - once again}", "{4, Welcome to streaming - once again}"); @@ -772,6 +978,164 @@ public void testConcurrentTransactionBatchCommits() throws Exception { } } + + private ArrayList dumpBucket(Path orcFile) throws IOException { + org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.getLocal(new Configuration()); + Reader reader = OrcFile.createReader(orcFile, + OrcFile.readerOptions(conf).filesystem(fs)); + + RecordReader rows = reader.rows(null); + StructObjectInspector inspector = (StructObjectInspector) reader + .getObjectInspector(); + + System.out.format("Found Bucket File : %s \n", orcFile.getName()); + ArrayList result = new ArrayList(); + while (rows.hasNext()) { + Object row = rows.next(null); + SampleRec rec = (SampleRec) deserializeDeltaFileRow(row, inspector)[5]; + result.add(rec); + } + + return result; + } + + // Assumes stored data schema = [acid fields],string,int,string + // return array of 6 fields, where the last field has the actual data + private static Object[] deserializeDeltaFileRow(Object row, StructObjectInspector inspector) { + List fields = inspector.getAllStructFieldRefs(); + + WritableIntObjectInspector f0ins = (WritableIntObjectInspector) fields.get(0).getFieldObjectInspector(); + WritableLongObjectInspector f1ins = (WritableLongObjectInspector) fields.get(1).getFieldObjectInspector(); + WritableIntObjectInspector f2ins = (WritableIntObjectInspector) fields.get(2).getFieldObjectInspector(); + WritableLongObjectInspector f3ins = (WritableLongObjectInspector) fields.get(3).getFieldObjectInspector(); + WritableLongObjectInspector f4ins = (WritableLongObjectInspector) fields.get(4).getFieldObjectInspector(); + StructObjectInspector f5ins = (StructObjectInspector) fields.get(5).getFieldObjectInspector(); + + int f0 = f0ins.get(inspector.getStructFieldData(row, fields.get(0))); + long f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1))); + int f2 = f2ins.get(inspector.getStructFieldData(row, fields.get(2))); + long f3 = f3ins.get(inspector.getStructFieldData(row, fields.get(3))); + long f4 = f4ins.get(inspector.getStructFieldData(row, fields.get(4))); + SampleRec f5 = deserializeInner(inspector.getStructFieldData(row, fields.get(5)), f5ins); + + return new Object[] {f0, f1, f2, f3, f4, f5}; + } + + // Assumes row schema => string,int,string + private static SampleRec deserializeInner(Object row, StructObjectInspector inspector) { + List fields = inspector.getAllStructFieldRefs(); + + WritableStringObjectInspector f0ins = (WritableStringObjectInspector) fields.get(0).getFieldObjectInspector(); + WritableIntObjectInspector f1ins = (WritableIntObjectInspector) fields.get(1).getFieldObjectInspector(); + WritableStringObjectInspector f2ins = (WritableStringObjectInspector) fields.get(2).getFieldObjectInspector(); + + String f0 = f0ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, fields.get(0))); + int f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1))); + String f2 = f2ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, fields.get(2))); + return new SampleRec(f0, f1, f2); + } + + @Test + public void testBucketing() throws Exception { + dropDB(msClient, dbName3); + dropDB(msClient, dbName4); + + // 1) Create two bucketed tables + String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db"; + dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths + String[] colNames = "key1,key2,data".split(","); + String[] colTypes = "string,int,string".split(","); + String[] bucketNames = "key1,key2".split(","); + int bucketCount = 4; + createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames + , null, dbLocation, bucketCount); + + String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + ".db"; + dbLocation2 = dbLocation2.replaceAll("\\\\","/"); // for windows paths + String[] colNames2 = "key3,key4,data2".split(","); + String[] colTypes2 = "string,int,string".split(","); + String[] bucketNames2 = "key3,key4".split(","); + createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2 + , null, dbLocation2, bucketCount); + + + // 2) Insert data into both tables + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt); + StreamingConnection connection = endPt.newConnection(false); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("name0,1,Hello streaming".getBytes()); + txnBatch.write("name2,2,Welcome to streaming".getBytes()); + txnBatch.write("name4,2,more Streaming unlimited".getBytes()); + txnBatch.write("name5,2,even more Streaming unlimited".getBytes()); + txnBatch.commit(); + + + HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null); + DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2); + StreamingConnection connection2 = endPt2.newConnection(false); + TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2); + txnBatch2.beginNextTransaction(); + + txnBatch2.write("name5,2,fact3".getBytes()); // bucket 0 + txnBatch2.write("name8,2,fact3".getBytes()); // bucket 1 + txnBatch2.write("name0,1,fact1".getBytes()); // bucket 2 + // no data for bucket 3 -- expect 0 length bucket file + + + txnBatch2.commit(); + + // 3 Check data distribution in buckets + + HashMap> actual1 = dumpAllBuckets(dbLocation, tblName3); + HashMap> actual2 = dumpAllBuckets(dbLocation2, tblName4); + System.err.println("\n Table 1"); + System.err.println(actual1); + System.err.println("\n Table 2"); + System.err.println(actual2); + + // assert bucket listing is as expected + Assert.assertEquals("number of buckets does not match expectation", actual1.values().size(), 4); + Assert.assertEquals("records in bucket does not match expectation", actual1.get(0).size(), 2); + Assert.assertEquals("records in bucket does not match expectation", actual1.get(1).size(), 1); + Assert.assertEquals("records in bucket does not match expectation", actual1.get(2).size(), 0); + Assert.assertEquals("records in bucket does not match expectation", actual1.get(3).size(), 1); + + + } + + + // assumes un partitioned table + // returns a map > + private HashMap> dumpAllBuckets(String dbLocation, String tableName) + throws IOException { + HashMap> result = new HashMap>(); + + for (File deltaDir : new File(dbLocation + "/" + tableName).listFiles()) { + if(!deltaDir.getName().startsWith("delta")) + continue; + File[] bucketFiles = deltaDir.listFiles(); + for (File bucketFile : bucketFiles) { + if(bucketFile.toString().endsWith("length")) + continue; + Integer bucketNum = getBucketNumber(bucketFile); + ArrayList recs = dumpBucket(new Path(bucketFile.toString())); + result.put(bucketNum, recs); + } + } + return result; + } + + //assumes bucket_NNNNN format of file name + private Integer getBucketNumber(File bucketFile) { + String fname = bucketFile.getName(); + int start = fname.indexOf('_'); + String number = fname.substring(start+1, fname.length()); + return Integer.parseInt(number); + } + // delete db and all tables in it public static void dropDB(IMetaStoreClient client, String databaseName) { try { @@ -784,69 +1148,139 @@ public static void dropDB(IMetaStoreClient client, String databaseName) { } - public void createDbAndTable(IMetaStoreClient client, String databaseName, - String tableName, List partVals) + + + ///////// -------- UTILS ------- ///////// + // returns Path of the partition created (if any) else Path of table + public static Path createDbAndTable(Driver driver, String databaseName, + String tableName, List partVals, + String[] colNames, String[] colTypes, + String[] bucketCols, + String[] partNames, String dbLocation, int bucketCount) throws Exception { - Database db = new Database(); - db.setName(databaseName); - String dbLocation = "raw://" + dbFolder.newFolder(databaseName + ".db").toURI().getPath(); - db.setLocationUri(dbLocation); - client.createDatabase(db); - - Table tbl = new Table(); - tbl.setDbName(databaseName); - tbl.setTableName(tableName); - tbl.setTableType(TableType.MANAGED_TABLE.toString()); - StorageDescriptor sd = new StorageDescriptor(); - sd.setCols(getTableColumns()); - sd.setNumBuckets(1); - sd.setLocation(dbLocation + Path.SEPARATOR + tableName); - tbl.setPartitionKeys(getPartitionKeys()); - - tbl.setSd(sd); - - sd.setBucketCols(new ArrayList(2)); - sd.setSerdeInfo(new SerDeInfo()); - sd.getSerdeInfo().setName(tbl.getTableName()); - sd.getSerdeInfo().setParameters(new HashMap()); - sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1"); - - sd.getSerdeInfo().setSerializationLib(OrcSerde.class.getName()); - sd.setInputFormat(HiveInputFormat.class.getName()); - sd.setOutputFormat(OrcOutputFormat.class.getName()); - - Map tableParams = new HashMap(); - tbl.setParameters(tableParams); - client.createTable(tbl); - try { - addPartition(client, tbl, partVals); - } catch (AlreadyExistsException e) { + String dbUri = "raw://" + new Path(dbLocation).toUri().toString(); + String tableLoc = dbUri + Path.SEPARATOR + tableName; + + runDDL(driver, "create database IF NOT EXISTS " + databaseName + " location '" + dbUri + "'"); + runDDL(driver, "use " + databaseName); + String crtTbl = "create table " + tableName + + " ( " + getTableColumnsStr(colNames,colTypes) + " )" + + getPartitionStmtStr(partNames) + + " clustered by ( " + join(bucketCols, ",") + " )" + + " into " + bucketCount + " buckets " + + " stored as orc " + + " location '" + tableLoc + "'"; + runDDL(driver, crtTbl); + if(partNames!=null && partNames.length!=0) { + return addPartition(driver, tableName, partVals, partNames); } - Partition createdPartition = client.getPartition(databaseName, tableName, partVals); - partLocation = createdPartition.getSd().getLocation(); + return new Path(tableLoc); + } + + private static Path addPartition(Driver driver, String tableName, List partVals, String[] partNames) throws QueryFailedException, CommandNeedRetryException, IOException { + String partSpec = getPartsSpec(partNames, partVals); + String addPart = "alter table " + tableName + " add partition ( " + partSpec + " )"; + runDDL(driver, addPart); + return getPartitionPath(driver, tableName, partSpec); } - private static void addPartition(IMetaStoreClient client, Table tbl - , List partValues) - throws IOException, TException { - Partition part = new Partition(); - part.setDbName(tbl.getDbName()); - part.setTableName(tblName); - StorageDescriptor sd = new StorageDescriptor(tbl.getSd()); - sd.setLocation(sd.getLocation() + Path.SEPARATOR + makePartPath(tbl.getPartitionKeys() - , partValues)); - part.setSd(sd); - part.setValues(partValues); - client.add_partition(part); + private static Path getPartitionPath(Driver driver, String tableName, String partSpec) throws CommandNeedRetryException, IOException { + ArrayList res = queryTable(driver, "describe extended " + tableName + " PARTITION (" + partSpec + ")"); + String partInfo = res.get(res.size() - 1); + int start = partInfo.indexOf("location:") + "location:".length(); + int end = partInfo.indexOf(",",start); + return new Path( partInfo.substring(start,end) ); + } + + private static String getTableColumnsStr(String[] colNames, String[] colTypes) { + StringBuffer sb = new StringBuffer(); + for (int i=0; i < colNames.length; ++i) { + sb.append(colNames[i] + " " + colTypes[i]); + if (i partVals) { + StringBuffer sb = new StringBuffer(); + for (int i=0; i < partVals.size(); ++i) { + sb.append(partNames[i] + " = '" + partVals.get(i) + "'"); + if(i < partVals.size()-1) { + sb.append(","); + } + } + return sb.toString(); + } + + private static String join(String[] values, String delimiter) { + if(values==null) + return null; + StringBuffer strbuf = new StringBuffer(); + + boolean first = true; + + for (Object value : values) { + if (!first) { strbuf.append(delimiter); } else { first = false; } + strbuf.append(value.toString()); + } + + return strbuf.toString(); + } + private static String getPartitionStmtStr(String[] partNames) { + if ( partNames == null || partNames.length == 0) { + return ""; + } + return " partitioned by (" + getTablePartsStr(partNames) + " )"; + } + + private static boolean runDDL(Driver driver, String sql) throws QueryFailedException { + LOG.debug(sql); + System.out.println(sql); + int retryCount = 1; // # of times to retry if first attempt fails + for (int attempt=0; attempt <= retryCount; ++attempt) { + try { + //LOG.debug("Running Hive Query: "+ sql); + CommandProcessorResponse cpr = driver.run(sql); + if(cpr.getResponseCode() == 0) { + return true; + } + LOG.error("Statement: " + sql + " failed: " + cpr); + } catch (CommandNeedRetryException e) { + if (attempt == retryCount) { + throw new QueryFailedException(sql, e); + } + continue; + } + } // for + return false; + } + + private static String makePartPath(List partKeys, List partVals) { if (partKeys.size()!=partVals.size()) { throw new IllegalArgumentException("Partition values:" + partVals + ", does not match the partition Keys in table :" + partKeys ); } - StringBuilder buff = new StringBuilder(partKeys.size()*20); + StringBuffer buff = new StringBuffer(partKeys.size()*20); buff.append(" ( "); int i=0; for (FieldSchema schema : partKeys) { @@ -870,4 +1304,56 @@ private static String makePartPath(List partKeys, List part fields.add(new FieldSchema(COL2, serdeConstants.STRING_TYPE_NAME, "")); return fields; } + + public static ArrayList queryTable(Driver driver, String query) + throws CommandNeedRetryException, IOException { + driver.run(query); + ArrayList res = new ArrayList(); + driver.getResults(res); + if(res.isEmpty()) + System.err.println(driver.getErrorMsg()); + return res; + } + + private static class SampleRec { + public String field1; + public int field2; + public String field3; + + public SampleRec(String field1, int field2, String field3) { + this.field1 = field1; + this.field2 = field2; + this.field3 = field3; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SampleRec that = (SampleRec) o; + + if (field2 != that.field2) return false; + if (field1 != null ? !field1.equals(that.field1) : that.field1 != null) return false; + return !(field3 != null ? !field3.equals(that.field3) : that.field3 != null); + + } + + @Override + public int hashCode() { + int result = field1 != null ? field1.hashCode() : 0; + result = 31 * result + field2; + result = 31 * result + (field3 != null ? field3.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return " { " + + "'" + field1 + '\'' + + "," + field2 + + ",'" + field3 + '\'' + + " }"; + } + } } diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java index 00a6384..54ae48e 100644 --- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java +++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java @@ -494,6 +494,35 @@ public static String getObjectInspectorName(ObjectInspector oi) { } } + /** + * Computes the bucket number to which the bucketFields belong to + * @param bucketFields the bucketed fields of the row + * @param bucketFieldInspectors the ObjectInpsectors for each of the bucketed fields + * @param totalBuckets the number of buckets in the table + * @return the bucket number + */ + public static int getBucketNumber(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors, int totalBuckets) { + int hashCode = getBucketHashCode(bucketFields, bucketFieldInspectors); + int bucketID = (hashCode & Integer.MAX_VALUE) % totalBuckets; + return bucketID; + } + + /** + * Computes the hash code for the given bucketed fields + * @param bucketFields + * @param bucketFieldInspectors + * @return + */ + private static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) { + int hashCode = 0; + for (int i = 0; i < bucketFields.length; i++) { + int fieldHash = ObjectInspectorUtils.hashCode(bucketFields[i], bucketFieldInspectors[i]); + hashCode = 31 * hashCode + fieldHash; + } + return hashCode; + } + + public static int hashCode(Object o, ObjectInspector objIns) { if (o == null) { return 0;