diff --git hcatalog/streaming/pom.xml hcatalog/streaming/pom.xml
index 6d03ce1..ba9f731 100644
--- hcatalog/streaming/pom.xml
+++ hcatalog/streaming/pom.xml
@@ -104,6 +104,13 @@
test
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-common
+ test
+ ${hadoop-23.version}
+
+
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index c959222..3814010 100644
--- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -22,9 +22,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -32,12 +33,19 @@
import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hive.hcatalog.common.HCatUtil;
import org.apache.thrift.TException;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
import java.util.Random;
abstract class AbstractRecordWriter implements RecordWriter {
@@ -47,12 +55,12 @@
final HiveEndPoint endPoint;
final Table tbl;
- final IMetaStoreClient msClient;
- RecordUpdater updater = null;
+ final HiveMetaStoreClient msClient;
+ protected final List bucketIds;
+ ArrayList updaters = null;
+
+ public final int totalBuckets;
- private final int totalBuckets;
- private Random rand = new Random();
- private int currentBucketId = 0;
private final Path partitionPath;
final AcidOutputFormat,?> outf;
@@ -63,16 +71,17 @@ protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf)
this.conf = conf!=null ? conf
: HiveEndPoint.createHiveConf(DelimitedInputWriter.class, endPoint.metaStoreUri);
try {
- msClient = HCatUtil.getHiveMetastoreClient(this.conf);
+ msClient = new HiveMetaStoreClient(this.conf);
this.tbl = msClient.getTable(endPoint.database, endPoint.table);
this.partitionPath = getPathForEndPoint(msClient, endPoint);
this.totalBuckets = tbl.getSd().getNumBuckets();
+ this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols()) ;
if(totalBuckets <= 0) {
throw new StreamingException("Cannot stream to table that has not been bucketed : "
+ endPoint);
}
String outFormatName = this.tbl.getSd().getOutputFormat();
- outf = (AcidOutputFormat,?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
+ outf = (AcidOutputFormat,?>) ReflectionUtils.newInstance(Class.forName(outFormatName), conf);
} catch (MetaException e) {
throw new ConnectionError(endPoint, e);
} catch (NoSuchObjectException e) {
@@ -81,11 +90,21 @@ protected AbstractRecordWriter(HiveEndPoint endPoint, HiveConf conf)
throw new StreamingException(e.getMessage(), e);
} catch (ClassNotFoundException e) {
throw new StreamingException(e.getMessage(), e);
- } catch (IOException e) {
- throw new StreamingException(e.getMessage(), e);
}
}
+ // return the column numbers of the bucketed columns
+ private List getBucketColIDs(List bucketCols, List cols) {
+ ArrayList result = new ArrayList(bucketCols.size());
+ HashSet bucketSet = new HashSet(bucketCols);
+ for (int i = 0; i < cols.size(); i++) {
+ if( bucketSet.contains(cols.get(i).getName()) ) {
+ result.add(i);
+ }
+ }
+ return result;
+ }
+
protected AbstractRecordWriter(HiveEndPoint endPoint)
throws ConnectionError, StreamingException {
this(endPoint, HiveEndPoint.createHiveConf(AbstractRecordWriter.class, endPoint.metaStoreUri) );
@@ -93,10 +112,23 @@ protected AbstractRecordWriter(HiveEndPoint endPoint)
abstract SerDe getSerde() throws SerializationError;
+ protected abstract ObjectInspector[] getBucketObjectInspectors();
+
+ protected abstract Object[] getBucketFields(Object row) throws SerializationError;
+
+ // returns the bucket number to which the record belongs to
+ protected int getBucket(Object row) throws SerializationError {
+ ObjectInspector[] inspectors = getBucketObjectInspectors();
+ Object[] bucketFields = getBucketFields(row);
+ return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets);
+ }
+
@Override
public void flush() throws StreamingIOFailure {
try {
- updater.flush();
+ for (RecordUpdater updater : updaters) {
+ updater.flush();
+ }
} catch (IOException e) {
throw new StreamingIOFailure("Unable to flush recordUpdater", e);
}
@@ -116,9 +148,8 @@ public void clear() throws StreamingIOFailure {
public void newBatch(Long minTxnId, Long maxTxnID)
throws StreamingIOFailure, SerializationError {
try {
- this.currentBucketId = rand.nextInt(totalBuckets);
LOG.debug("Creating Record updater");
- updater = createRecordUpdater(currentBucketId, minTxnId, maxTxnID);
+ updaters = createRecordUpdaters(totalBuckets, minTxnId, maxTxnID);
} catch (IOException e) {
LOG.error("Failed creating record updater", e);
throw new StreamingIOFailure("Unable to get new record Updater", e);
@@ -128,13 +159,24 @@ public void newBatch(Long minTxnId, Long maxTxnID)
@Override
public void closeBatch() throws StreamingIOFailure {
try {
- updater.close(false);
- updater = null;
+ for (RecordUpdater updater : updaters) {
+ updater.close(false);
+ }
+ updaters.clear();
} catch (IOException e) {
throw new StreamingIOFailure("Unable to close recordUpdater", e);
}
}
+ private ArrayList createRecordUpdaters(int bucketCount, Long minTxnId, Long maxTxnID)
+ throws IOException, SerializationError {
+ ArrayList result = new ArrayList(bucketCount);
+ for (int bucket = 0; bucket < bucketCount; bucket++) {
+ result.add(createRecordUpdater(bucket, minTxnId, maxTxnID) );
+ }
+ return result;
+ }
+
private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxTxnID)
throws IOException, SerializationError {
try {
@@ -143,16 +185,14 @@ private RecordUpdater createRecordUpdater(int bucketId, Long minTxnId, Long maxT
.inspector(getSerde().getObjectInspector())
.bucket(bucketId)
.minimumTransactionId(minTxnId)
- .maximumTransactionId(maxTxnID)
- .statementId(-1)
- .finalDestination(partitionPath));
+ .maximumTransactionId(maxTxnID));
} catch (SerDeException e) {
throw new SerializationError("Failed to get object inspector from Serde "
+ getSerde().getClass().getName(), e);
}
}
- private Path getPathForEndPoint(IMetaStoreClient msClient, HiveEndPoint endPoint)
+ private Path getPathForEndPoint(HiveMetaStoreClient msClient, HiveEndPoint endPoint)
throws StreamingException {
try {
String location;
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
index 6dc69f0..71fdad9 100644
--- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/DelimitedInputWriter.java
@@ -26,12 +26,14 @@
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.lazy.LazySerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.io.BytesWritable;
import java.io.IOException;
@@ -51,7 +53,13 @@
private char serdeSeparator;
private int[] fieldToColMapping;
private final ArrayList tableColumns;
- private AbstractSerDe serde = null;
+ private LazySimpleSerDe serde = null;
+
+ private final LazySimpleStructObjectInspector recordObjInspector;
+ private final ObjectInspector[] bucketObjInspectors;
+ private final StructField[] bucketStructFields;
+ private Object[] bucketFieldData; // Preallocating in constructor. Updated on each write.
+
static final private Log LOG = LogFactory.getLog(DelimitedInputWriter.class.getName());
@@ -120,6 +128,24 @@ public DelimitedInputWriter(String[] colNamesForFields, String delimiter,
this.reorderingNeeded = isReorderingNeeded(delimiter, getTableColumns());
LOG.debug("Field reordering needed = " + this.reorderingNeeded + ", for endpoint " + endPoint);
this.serdeSeparator = serdeSeparator;
+ this.serde = createSerde(tbl, conf, serdeSeparator);
+
+ // get ObjInspectors for entire record and bucketed cols
+ try {
+ recordObjInspector = (LazySimpleStructObjectInspector) serde.getObjectInspector();
+ this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+ }
+
+ // get StructFields for bucketed cols
+ bucketStructFields = new StructField[bucketIds.size()];
+ List extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+ for (int i = 0; i < bucketIds.size(); i++) {
+ bucketStructFields[i] = allFields.get(bucketIds.get(i));
+ }
+ bucketFieldData = new Object[bucketIds.size()];
+
}
private boolean isReorderingNeeded(String delimiter, ArrayList tableColumns) {
@@ -173,20 +199,20 @@ private static boolean areFieldsInColOrder(int[] fieldToColMapping) {
}
String[] reorderedFields = new String[getTableColumns().size()];
String decoded = new String(record);
- String[] fields = decoded.split(delimiter);
+ String[] fields = decoded.split(delimiter,-1);
for (int i=0; i bucketIds
+ , LazySimpleStructObjectInspector recordObjInspector)
+ throws SerializationError {
+ ObjectInspector[] result = new ObjectInspector[bucketIds.size()];
+
+ for (int i = 0; i < bucketIds.size(); i++) {
+ int bucketId = bucketIds.get(i);
+ result[i] =
+ recordObjInspector.getAllStructFieldRefs().get( bucketId ).getFieldObjectInspector();
+ }
+ return result;
+ }
+
+ @Override
+ protected Object[] getBucketFields(Object row) throws SerializationError {
+ for (int i = 0; i < bucketIds.size(); i++) {
+ bucketFieldData[i] = recordObjInspector.getStructFieldData(row, bucketStructFields[i]);
+ }
+ return bucketFieldData;
+ }
+
+
private Object encode(byte[] record) throws SerializationError {
try {
BytesWritable blob = new BytesWritable();
@@ -244,7 +293,7 @@ private Object encode(byte[] record) throws SerializationError {
* @throws SerializationError if serde could not be initialized
* @param tbl
*/
- protected LazySimpleSerDe createSerde(Table tbl, HiveConf conf)
+ protected static LazySimpleSerDe createSerde(Table tbl, HiveConf conf, char serdeSeparator)
throws SerializationError {
try {
Properties tableProps = MetaStoreUtils.getTableMetadata(tbl);
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
index 6d6beb8..04f9d7b 100644
--- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/StrictJsonWriter.java
@@ -24,10 +24,17 @@
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.Text;
+import org.apache.hive.hcatalog.data.HCatRecordObjectInspector;
import org.apache.hive.hcatalog.data.JsonSerDe;
import java.io.IOException;
+import java.util.List;
import java.util.Properties;
/**
@@ -37,6 +44,11 @@
public class StrictJsonWriter extends AbstractRecordWriter {
private JsonSerDe serde;
+ private final HCatRecordObjectInspector recordObjInspector;
+ private final ObjectInspector[] bucketObjInspectors;
+ private final StructField[] bucketStructFields;
+ private Object[] bucketFieldData; // Preallocating in constructor. Updated on each write.
+
/**
*
* @param endPoint the end point to write to
@@ -47,6 +59,23 @@
public StrictJsonWriter(HiveEndPoint endPoint)
throws ConnectionError, SerializationError, StreamingException {
super(endPoint, null);
+ this.serde = createSerde(tbl, conf);
+
+ // get ObjInspectors for entire record and bucketed cols
+ try {
+ recordObjInspector = ( HCatRecordObjectInspector ) serde.getObjectInspector();
+ this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+ }
+
+ // get StructFields for bucketed cols
+ bucketStructFields = new StructField[bucketIds.size()];
+ List extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+ for (int i = 0; i < bucketIds.size(); i++) {
+ bucketStructFields[i] = allFields.get(bucketIds.get(i));
+ }
+ bucketFieldData = new Object[bucketIds.size()];
}
/**
@@ -60,23 +89,59 @@ public StrictJsonWriter(HiveEndPoint endPoint)
public StrictJsonWriter(HiveEndPoint endPoint, HiveConf conf)
throws ConnectionError, SerializationError, StreamingException {
super(endPoint, conf);
+
+ // get ObjInspectors for entire record and bucketed cols
+ try {
+ recordObjInspector = ( HCatRecordObjectInspector ) serde.getObjectInspector();
+ this.bucketObjInspectors = getObjectInspectorsForBucketedCols(bucketIds, recordObjInspector);
+ } catch (SerDeException e) {
+ throw new SerializationError("Unable to get ObjectInspector for bucket columns", e);
+ }
+
+ // get StructFields for bucketed cols
+ bucketStructFields = new StructField[bucketIds.size()];
+ List extends StructField> allFields = recordObjInspector.getAllStructFieldRefs();
+ for (int i = 0; i < bucketIds.size(); i++) {
+ bucketStructFields[i] = allFields.get(bucketIds.get(i));
+ }
+ bucketFieldData = new Object[bucketIds.size()];
}
@Override
SerDe getSerde() throws SerializationError {
- if(serde!=null) {
- return serde;
- }
- serde = createSerde(tbl, conf);
return serde;
}
+ protected ObjectInspector[] getBucketObjectInspectors() {
+ return bucketObjInspectors;
+ }
+
+ @Override
+ protected Object[] getBucketFields(Object row) throws SerializationError {
+ for (int i = 0; i < bucketIds.size(); i++) {
+ bucketFieldData[i] = recordObjInspector.getStructFieldData(row, bucketStructFields[i]);
+ }
+ return bucketFieldData;
+ }
+
+ private static ObjectInspector[] getObjectInspectorsForBucketedCols(List bucketIds
+ , HCatRecordObjectInspector recordObjInspector)
+ throws SerializationError {
+ ObjectInspector[] result = new ObjectInspector[bucketIds.size()];
+ for (int i = 0; i < bucketIds.size(); i++) {
+ int bucketId = bucketIds.get(i);
+ result[i] = recordObjInspector.getAllStructFieldRefs().get( bucketId ).getFieldObjectInspector();
+ }
+ return result;
+ }
+
@Override
public void write(long transactionId, byte[] record)
throws StreamingIOFailure, SerializationError {
try {
Object encodedRow = encode(record);
- updater.insert(transactionId, encodedRow);
+ int bucket = getBucket(encodedRow);
+ updaters.get(bucket).insert(transactionId, encodedRow);
} catch (IOException e) {
throw new StreamingIOFailure("Error writing record in transaction("
+ transactionId + ")", e);
diff --git hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index c28d4aa..720ba12 100644
--- hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -18,33 +18,38 @@
package org.apache.hive.hcatalog.streaming;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
-import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.apache.hadoop.hive.ql.io.orc.RecordReader;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
@@ -68,7 +73,6 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -118,6 +122,7 @@ public FileStatus getFileStatus(Path path) throws IOException {
private static final String COL2 = "msg";
private final HiveConf conf;
+ private Driver driver;
private final IMetaStoreClient msClient;
final String metaStoreURI = null;
@@ -127,13 +132,23 @@ public FileStatus getFileStatus(Path path) throws IOException {
private final static String tblName = "alerts";
private final static String[] fieldNames = new String[]{COL1,COL2};
List partitionVals;
- private static String partLocation;
+ private static Path partLoc;
+ private static Path partLoc2;
// unpartitioned table
- private final static String dbName2 = "testing";
+ private final static String dbName2 = "testing2";
private final static String tblName2 = "alerts";
private final static String[] fieldNames2 = new String[]{COL1,COL2};
+
+ // for bucket join testing
+ private final static String dbName3 = "testing3";
+ private final static String tblName3 = "dimensionTable";
+ private final static String dbName4 = "testing4";
+ private final static String tblName4 = "factTable";
+ List partitionVals2;
+
+
private final String PART1_CONTINENT = "Asia";
private final String PART1_COUNTRY = "India";
@@ -146,14 +161,21 @@ public TestStreaming() throws Exception {
partitionVals.add(PART1_CONTINENT);
partitionVals.add(PART1_COUNTRY);
+ partitionVals2 = new ArrayList(1);
+ partitionVals2.add(PART1_COUNTRY);
+
+
conf = new HiveConf(this.getClass());
conf.set("fs.raw.impl", RawFileSystem.class.getName());
+ conf.set("hive.enforce.bucketing", "true");
TxnDbUtil.setConfValues(conf);
if (metaStoreURI!=null) {
conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
}
conf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, true);
conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
+ dbFolder.create();
+
//1) Start from a clean slate (metastore)
TxnDbUtil.cleanDb();
@@ -165,17 +187,37 @@ public TestStreaming() throws Exception {
@Before
public void setup() throws Exception {
+ SessionState.start(new CliSessionState(conf));
+ driver = new Driver(conf);
+ driver.setMaxRows(200002);//make sure Driver returns all results
// drop and recreate the necessary databases and tables
dropDB(msClient, dbName);
- createDbAndTable(msClient, dbName, tblName, partitionVals);
+
+ String[] colNames = new String[] {COL1, COL2};
+ String[] colTypes = new String[] {serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME};
+ String[] bucketCols = new String[] {COL1};
+// partLoc = createDbAndTable(msClient, dbName, tblName, partitionVals);
+ String loc1 = dbFolder.newFolder(dbName + ".db").toString();
+ String[] partNames = new String[]{"Continent", "Country"};
+ partLoc = createDbAndTable(driver, dbName, tblName, partitionVals, colNames, colTypes, bucketCols, partNames, loc1, 1);
dropDB(msClient, dbName2);
- createDbAndTable(msClient, dbName2, tblName2, partitionVals);
+// partLoc2 = createDbAndTable(msClient, dbName2, tblName2, partitionVals);
+ String loc2 = dbFolder.newFolder(dbName2 + ".db").toString();
+ partLoc2 = createDbAndTable(driver, dbName2, tblName2, partitionVals, colNames, colTypes, bucketCols, partNames, loc2, 2);
+
+ String loc3 = dbFolder.newFolder("testing5.db").toString();
+ createStoreSales("testing5", loc3);
+
+ runDDL(driver, "drop table testBucketing3.streamedtable");
+ runDDL(driver, "drop table testBucketing3.finaltable");
+ runDDL(driver, "drop table testBucketing3.nobucket");
}
@After
public void cleanup() throws Exception {
msClient.close();
+ driver.close();
}
private static List getPartitionKeys() {
@@ -186,10 +228,174 @@ public void cleanup() throws Exception {
return fields;
}
- private void checkDataWritten(long minTxn, long maxTxn, int buckets, int numExpectedFiles,
+ private void createStoreSales(String dbName, String loc) throws Exception {
+ String dbUri = "raw://" + new Path(loc).toUri().toString();
+ String tableLoc = dbUri + Path.SEPARATOR + "store_sales";
+
+ boolean success = runDDL(driver, "create database IF NOT EXISTS " + dbName + " location '" + dbUri + "'");
+ Assert.assertTrue(success);
+ success = runDDL(driver, "use " + dbName);
+ Assert.assertTrue(success);
+
+ success = runDDL(driver, "drop table if exists store_sales");
+ Assert.assertTrue(success);
+ success = runDDL(driver, "create table store_sales\n" +
+ "(\n" +
+ " ss_sold_date_sk int,\n" +
+ " ss_sold_time_sk int,\n" +
+ " ss_item_sk int,\n" +
+ " ss_customer_sk int,\n" +
+ " ss_cdemo_sk int,\n" +
+ " ss_hdemo_sk int,\n" +
+ " ss_addr_sk int,\n" +
+ " ss_store_sk int,\n" +
+ " ss_promo_sk int,\n" +
+ " ss_ticket_number int,\n" +
+ " ss_quantity int,\n" +
+ " ss_wholesale_cost decimal(7,2),\n" +
+ " ss_list_price decimal(7,2),\n" +
+ " ss_sales_price decimal(7,2),\n" +
+ " ss_ext_discount_amt decimal(7,2),\n" +
+ " ss_ext_sales_price decimal(7,2),\n" +
+ " ss_ext_wholesale_cost decimal(7,2),\n" +
+ " ss_ext_list_price decimal(7,2),\n" +
+ " ss_ext_tax decimal(7,2),\n" +
+ " ss_coupon_amt decimal(7,2),\n" +
+ " ss_net_paid decimal(7,2),\n" +
+ " ss_net_paid_inc_tax decimal(7,2),\n" +
+ " ss_net_profit decimal(7,2)\n" +
+ ")\n" +
+ " partitioned by (dt string)\n" +
+ "clustered by (ss_store_sk, ss_promo_sk)\n" +
+ "INTO 4 BUCKETS stored as orc " + " location '" + tableLoc + "'" + " TBLPROPERTIES ('orc.compress'='NONE', 'transactional'='true')");
+ Assert.assertTrue(success);
+
+ success = runDDL(driver, "alter table store_sales add partition(dt='2015')");
+ Assert.assertTrue(success);
+ }
+ /**
+ * make sure it works with table where bucket col is not 1st col
+ * @throws Exception
+ */
+ @Test
+ public void testBucketingWhereBucketColIsNotFirstCol() throws Exception {
+ List partitionVals = new ArrayList();
+ partitionVals.add("2015");
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testing5", "store_sales", partitionVals);
+ DelimitedInputWriter writer = new DelimitedInputWriter(new String[] {"ss_sold_date_sk","ss_sold_time_sk", "ss_item_sk",
+ "ss_customer_sk", "ss_cdemo_sk", "ss_hdemo_sk", "ss_addr_sk", "ss_store_sk", "ss_promo_sk", "ss_ticket_number", "ss_quantity",
+ "ss_wholesale_cost", "ss_list_price", "ss_sales_price", "ss_ext_discount_amt", "ss_ext_sales_price", "ss_ext_wholesale_cost",
+ "ss_ext_list_price", "ss_ext_tax", "ss_coupon_amt", "ss_net_paid", "ss_net_paid_inc_tax", "ss_net_profit"},",", endPt);
+ StreamingConnection connection = endPt.newConnection(false, null);//should this really be null?
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+
+ StringBuilder row = new StringBuilder();
+ for(int i = 0; i < 10; i++) {
+ for(int ints = 0; ints < 11; ints++) {
+ row.append(ints).append(',');
+ }
+ for(int decs = 0; decs < 12; decs++) {
+ row.append(i + 0.1).append(',');
+ }
+ row.setLength(row.length() - 1);
+ txnBatch.write(row.toString().getBytes());
+ }
+ txnBatch.commit();
+ txnBatch.close();
+ connection.close();
+
+ driver.run("set hive.execution.engine=tez");
+ ArrayList res = queryTable(driver, "select row__id.bucketid, * from testing5.store_sales");
+ for (String re : res) {
+ System.out.println(re);
+ }
+ }
+
+
+ // stream data into streaming table with N buckets, then copy the data into another bucketed table
+ // check if bucketing in both was done in the same way
+ @Test
+ public void testStreamBucketingMatchesRegularBucketing() throws Exception {
+ int bucketCount = 100;
+
+ String dbUri = "raw://" + new Path(dbFolder.newFolder().toString()).toUri().toString();
+ String tableLoc = "'" + dbUri + Path.SEPARATOR + "streamedtable" + "'";
+ String tableLoc2 = "'" + dbUri + Path.SEPARATOR + "finaltable" + "'";
+ String tableLoc3 = "'" + dbUri + Path.SEPARATOR + "nobucket" + "'";
+
+ runDDL(driver, "create database testBucketing3");
+ runDDL(driver, "use testBucketing3");
+ runDDL(driver, "set hive.enforce.bucketing = true");
+ runDDL(driver, "create table streamedtable ( key1 string,key2 int,data string ) clustered by ( key1,key2 ) into "
+ + bucketCount + " buckets stored as orc location " + tableLoc + " TBLPROPERTIES ('transactional'='true')") ;
+// In 'nobucket' table we capture bucketid from streamedtable to workaround a hive bug that prevents joins two identically bucketed tables
+ runDDL(driver, "create table nobucket ( bucketid int, key1 string,key2 int,data string ) location " + tableLoc3) ;
+ runDDL(driver, "create table finaltable ( bucketid int, key1 string,key2 int,data string ) clustered by ( key1,key2 ) into "
+ + bucketCount + " buckets stored as orc location " + tableLoc2 + " TBLPROPERTIES ('transactional'='true')");
+
+
+ String[] records = new String[] {
+ "PSFAHYLZVC,29,EPNMA",
+ "PPPRKWAYAU,96,VUTEE",
+ "MIAOFERCHI,3,WBDSI",
+ "CEGQAZOWVN,0,WCUZL",
+ "XWAKMNSVQF,28,YJVHU",
+ "XBWTSAJWME,2,KDQFO",
+ "FUVLQTAXAY,5,LDSDG",
+ "QTQMDJMGJH,6,QBOMA",
+ "EFLOTLWJWN,71,GHWPS",
+ "PEQNAOJHCM,82,CAAFI",
+ "MOEKQLGZCP,41,RUACR",
+ "QZXMCOPTID,37,LFLWE",
+ "EYALVWICRD,13,JEZLC",
+ "VYWLZAYTXX,16,DMVZX",
+ "OSALYSQIXR,47,HNZVE",
+ "JGKVHKCEGQ,25,KSCJB",
+ "WQFMMYDHET,12,DTRWA",
+ "AJOVAYZKZQ,15,YBKFO",
+ "YAQONWCUAU,31,QJNHZ",
+ "DJBXUEUOEB,35,IYCBL"
+ };
+
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, "testBucketing3", "streamedtable", null);
+ String[] colNames1 = new String[] { "key1", "key2", "data" };
+ DelimitedInputWriter wr = new DelimitedInputWriter(colNames1,",", endPt);
+ StreamingConnection connection = endPt.newConnection(false);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, wr);
+ txnBatch.beginNextTransaction();
+
+ for (String record : records) {
+ txnBatch.write(record.toString().getBytes());
+ }
+
+ txnBatch.commit();
+ txnBatch.close();
+ connection.close();
+
+ driver.run("set hive.execution.engine=tez");
+ ArrayList res1 = queryTable(driver, "select row__id.bucketid, * from streamedtable order by key2");
+ for (String re : res1) {
+ System.out.println(re);
+ }
+
+ driver.run("insert into nobucket select row__id.bucketid,* from streamedtable");
+ runDDL(driver, "set hive.enforce.bucketing = true");
+ runDDL(driver, " insert into finaltable select * from nobucket");
+ ArrayList res2 = queryTable(driver, "select row__id.bucketid,* from finaltable where row__id.bucketid<>bucketid");
+ for (String s : res2) {
+ LOG.error(s);
+ }
+ Assert.assertTrue(res2.isEmpty());
+ }
+
+
+ private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
String... records) throws Exception {
ValidTxnList txns = msClient.getValidTxns();
- AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List current = dir.getCurrentDirectories();
@@ -197,7 +403,7 @@ private void checkDataWritten(long minTxn, long maxTxn, int buckets, int numExpe
for (AcidUtils.ParsedDelta pd : current) System.out.println(pd.getPath().toString());
Assert.assertEquals(numExpectedFiles, current.size());
- // find the absolute mininum transaction
+ // find the absolute minimum transaction
long min = Long.MAX_VALUE;
long max = Long.MIN_VALUE;
for (AcidUtils.ParsedDelta pd : current) {
@@ -209,11 +415,11 @@ private void checkDataWritten(long minTxn, long maxTxn, int buckets, int numExpe
InputFormat inf = new OrcInputFormat();
JobConf job = new JobConf();
- job.set("mapred.input.dir", partLocation.toString());
+ job.set("mapred.input.dir", partitionPath.toString());
job.set("bucket_count", Integer.toString(buckets));
job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
- InputSplit[] splits = inf.getSplits(job, 1);
- Assert.assertEquals(1, splits.length);
+ InputSplit[] splits = inf.getSplits(job, buckets);
+ Assert.assertEquals(buckets, splits.length);
org.apache.hadoop.mapred.RecordReader rr =
inf.getRecordReader(splits[0], job, Reporter.NULL);
@@ -226,9 +432,9 @@ private void checkDataWritten(long minTxn, long maxTxn, int buckets, int numExpe
Assert.assertEquals(false, rr.next(key, value));
}
- private void checkNothingWritten() throws Exception {
+ private void checkNothingWritten(Path partitionPath) throws Exception {
ValidTxnList txns = msClient.getValidTxns();
- AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(partLocation), conf, txns);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, txns);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List current = dir.getCurrentDirectories();
@@ -398,7 +604,7 @@ public void testTransactionBatchCommit_Delimited() throws Exception {
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -410,11 +616,11 @@ public void testTransactionBatchCommit_Delimited() throws Exception {
txnBatch.write("2,Welcome to streaming".getBytes());
// data should not be visible
- checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
txnBatch.commit();
- checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
"{2, Welcome to streaming}");
txnBatch.close();
@@ -459,7 +665,7 @@ public void testTransactionBatchCommit_Json() throws Exception {
txnBatch.write(rec1.getBytes());
txnBatch.commit();
- checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -518,7 +724,7 @@ public void testRemainingTransactions() throws Exception {
, txnBatch.getCurrentTransactionState());
++batch;
}
- Assert.assertEquals(0,txnBatch.remainingTransactions());
+ Assert.assertEquals(0, txnBatch.remainingTransactions());
txnBatch.close();
Assert.assertEquals(TransactionBatch.TxnState.INACTIVE
@@ -542,7 +748,7 @@ public void testTransactionBatchAbort() throws Exception {
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.abort();
- checkNothingWritten();
+ checkNothingWritten(partLoc);
Assert.assertEquals(TransactionBatch.TxnState.ABORTED
, txnBatch.getCurrentTransactionState());
@@ -550,7 +756,7 @@ public void testTransactionBatchAbort() throws Exception {
txnBatch.close();
connection.close();
- checkNothingWritten();
+ checkNothingWritten(partLoc);
}
@@ -569,7 +775,7 @@ public void testTransactionBatchAbortAndCommit() throws Exception {
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.abort();
- checkNothingWritten();
+ checkNothingWritten(partLoc);
Assert.assertEquals(TransactionBatch.TxnState.ABORTED
, txnBatch.getCurrentTransactionState());
@@ -579,8 +785,8 @@ public void testTransactionBatchAbortAndCommit() throws Exception {
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
- "{2, Welcome to streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ "{2, Welcome to streaming}");
txnBatch.close();
connection.close();
@@ -598,14 +804,14 @@ public void testMultipleTransactionBatchCommits() throws Exception {
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.commit();
- checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}");
txnBatch.beginNextTransaction();
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
- checkDataWritten(1, 10, 1, 1, "{1, Hello streaming}",
- "{2, Welcome to streaming}");
+ checkDataWritten(partLoc, 1, 10, 1, 1, "{1, Hello streaming}",
+ "{2, Welcome to streaming}");
txnBatch.close();
@@ -615,16 +821,16 @@ public void testMultipleTransactionBatchCommits() throws Exception {
txnBatch.write("3,Hello streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
- "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ "{2, Welcome to streaming}", "{3, Hello streaming - once again}");
txnBatch.beginNextTransaction();
txnBatch.write("4,Welcome to streaming - once again".getBytes());
txnBatch.commit();
- checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
- "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
- "{4, Welcome to streaming - once again}");
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
+ "{2, Welcome to streaming}", "{3, Hello streaming - once again}",
+ "{4, Welcome to streaming - once again}");
Assert.assertEquals(TransactionBatch.TxnState.COMMITTED
, txnBatch.getCurrentTransactionState());
@@ -655,15 +861,15 @@ public void testInterleavedTransactionBatchCommits() throws Exception {
txnBatch1.write("1,Hello streaming".getBytes());
txnBatch2.write("3,Hello streaming - once again".getBytes());
- checkNothingWritten();
+ checkNothingWritten(partLoc);
txnBatch2.commit();
- checkDataWritten(11, 20, 1, 1, "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 11, 20, 1, 1, "{3, Hello streaming - once again}");
txnBatch1.commit();
- checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
txnBatch1.beginNextTransaction();
txnBatch1.write("2,Welcome to streaming".getBytes());
@@ -671,17 +877,17 @@ public void testInterleavedTransactionBatchCommits() throws Exception {
txnBatch2.beginNextTransaction();
txnBatch2.write("4,Welcome to streaming - once again".getBytes());
- checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}", "{3, Hello streaming - once again}");
txnBatch1.commit();
- checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}",
"{3, Hello streaming - once again}");
txnBatch2.commit();
- checkDataWritten(1, 20, 1, 2, "{1, Hello streaming}",
+ checkDataWritten(partLoc, 1, 20, 1, 2, "{1, Hello streaming}",
"{2, Welcome to streaming}",
"{3, Hello streaming - once again}",
"{4, Welcome to streaming - once again}");
@@ -772,6 +978,164 @@ public void testConcurrentTransactionBatchCommits() throws Exception {
}
}
+
+ private ArrayList dumpBucket(Path orcFile) throws IOException {
+ org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.getLocal(new Configuration());
+ Reader reader = OrcFile.createReader(orcFile,
+ OrcFile.readerOptions(conf).filesystem(fs));
+
+ RecordReader rows = reader.rows(null);
+ StructObjectInspector inspector = (StructObjectInspector) reader
+ .getObjectInspector();
+
+ System.out.format("Found Bucket File : %s \n", orcFile.getName());
+ ArrayList result = new ArrayList();
+ while (rows.hasNext()) {
+ Object row = rows.next(null);
+ SampleRec rec = (SampleRec) deserializeDeltaFileRow(row, inspector)[5];
+ result.add(rec);
+ }
+
+ return result;
+ }
+
+ // Assumes stored data schema = [acid fields],string,int,string
+ // return array of 6 fields, where the last field has the actual data
+ private static Object[] deserializeDeltaFileRow(Object row, StructObjectInspector inspector) {
+ List extends StructField> fields = inspector.getAllStructFieldRefs();
+
+ WritableIntObjectInspector f0ins = (WritableIntObjectInspector) fields.get(0).getFieldObjectInspector();
+ WritableLongObjectInspector f1ins = (WritableLongObjectInspector) fields.get(1).getFieldObjectInspector();
+ WritableIntObjectInspector f2ins = (WritableIntObjectInspector) fields.get(2).getFieldObjectInspector();
+ WritableLongObjectInspector f3ins = (WritableLongObjectInspector) fields.get(3).getFieldObjectInspector();
+ WritableLongObjectInspector f4ins = (WritableLongObjectInspector) fields.get(4).getFieldObjectInspector();
+ StructObjectInspector f5ins = (StructObjectInspector) fields.get(5).getFieldObjectInspector();
+
+ int f0 = f0ins.get(inspector.getStructFieldData(row, fields.get(0)));
+ long f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1)));
+ int f2 = f2ins.get(inspector.getStructFieldData(row, fields.get(2)));
+ long f3 = f3ins.get(inspector.getStructFieldData(row, fields.get(3)));
+ long f4 = f4ins.get(inspector.getStructFieldData(row, fields.get(4)));
+ SampleRec f5 = deserializeInner(inspector.getStructFieldData(row, fields.get(5)), f5ins);
+
+ return new Object[] {f0, f1, f2, f3, f4, f5};
+ }
+
+ // Assumes row schema => string,int,string
+ private static SampleRec deserializeInner(Object row, StructObjectInspector inspector) {
+ List extends StructField> fields = inspector.getAllStructFieldRefs();
+
+ WritableStringObjectInspector f0ins = (WritableStringObjectInspector) fields.get(0).getFieldObjectInspector();
+ WritableIntObjectInspector f1ins = (WritableIntObjectInspector) fields.get(1).getFieldObjectInspector();
+ WritableStringObjectInspector f2ins = (WritableStringObjectInspector) fields.get(2).getFieldObjectInspector();
+
+ String f0 = f0ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, fields.get(0)));
+ int f1 = f1ins.get(inspector.getStructFieldData(row, fields.get(1)));
+ String f2 = f2ins.getPrimitiveJavaObject(inspector.getStructFieldData(row, fields.get(2)));
+ return new SampleRec(f0, f1, f2);
+ }
+
+ @Test
+ public void testBucketing() throws Exception {
+ dropDB(msClient, dbName3);
+ dropDB(msClient, dbName4);
+
+ // 1) Create two bucketed tables
+ String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db";
+ dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths
+ String[] colNames = "key1,key2,data".split(",");
+ String[] colTypes = "string,int,string".split(",");
+ String[] bucketNames = "key1,key2".split(",");
+ int bucketCount = 4;
+ createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames
+ , null, dbLocation, bucketCount);
+
+ String dbLocation2 = dbFolder.newFolder(dbName4).getCanonicalPath() + ".db";
+ dbLocation2 = dbLocation2.replaceAll("\\\\","/"); // for windows paths
+ String[] colNames2 = "key3,key4,data2".split(",");
+ String[] colTypes2 = "string,int,string".split(",");
+ String[] bucketNames2 = "key3,key4".split(",");
+ createDbAndTable(driver, dbName4, tblName4, null, colNames2, colTypes2, bucketNames2
+ , null, dbLocation2, bucketCount);
+
+
+ // 2) Insert data into both tables
+ HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null);
+ DelimitedInputWriter writer = new DelimitedInputWriter(colNames,",", endPt);
+ StreamingConnection connection = endPt.newConnection(false);
+
+ TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+ txnBatch.beginNextTransaction();
+ txnBatch.write("name0,1,Hello streaming".getBytes());
+ txnBatch.write("name2,2,Welcome to streaming".getBytes());
+ txnBatch.write("name4,2,more Streaming unlimited".getBytes());
+ txnBatch.write("name5,2,even more Streaming unlimited".getBytes());
+ txnBatch.commit();
+
+
+ HiveEndPoint endPt2 = new HiveEndPoint(metaStoreURI, dbName4, tblName4, null);
+ DelimitedInputWriter writer2 = new DelimitedInputWriter(colNames2,",", endPt2);
+ StreamingConnection connection2 = endPt2.newConnection(false);
+ TransactionBatch txnBatch2 = connection2.fetchTransactionBatch(2, writer2);
+ txnBatch2.beginNextTransaction();
+
+ txnBatch2.write("name5,2,fact3".getBytes()); // bucket 0
+ txnBatch2.write("name8,2,fact3".getBytes()); // bucket 1
+ txnBatch2.write("name0,1,fact1".getBytes()); // bucket 2
+ // no data for bucket 3 -- expect 0 length bucket file
+
+
+ txnBatch2.commit();
+
+ // 3 Check data distribution in buckets
+
+ HashMap> actual1 = dumpAllBuckets(dbLocation, tblName3);
+ HashMap> actual2 = dumpAllBuckets(dbLocation2, tblName4);
+ System.err.println("\n Table 1");
+ System.err.println(actual1);
+ System.err.println("\n Table 2");
+ System.err.println(actual2);
+
+ // assert bucket listing is as expected
+ Assert.assertEquals("number of buckets does not match expectation", actual1.values().size(), 4);
+ Assert.assertEquals("records in bucket does not match expectation", actual1.get(0).size(), 2);
+ Assert.assertEquals("records in bucket does not match expectation", actual1.get(1).size(), 1);
+ Assert.assertEquals("records in bucket does not match expectation", actual1.get(2).size(), 0);
+ Assert.assertEquals("records in bucket does not match expectation", actual1.get(3).size(), 1);
+
+
+ }
+
+
+ // assumes un partitioned table
+ // returns a map >
+ private HashMap> dumpAllBuckets(String dbLocation, String tableName)
+ throws IOException {
+ HashMap> result = new HashMap>();
+
+ for (File deltaDir : new File(dbLocation + "/" + tableName).listFiles()) {
+ if(!deltaDir.getName().startsWith("delta"))
+ continue;
+ File[] bucketFiles = deltaDir.listFiles();
+ for (File bucketFile : bucketFiles) {
+ if(bucketFile.toString().endsWith("length"))
+ continue;
+ Integer bucketNum = getBucketNumber(bucketFile);
+ ArrayList recs = dumpBucket(new Path(bucketFile.toString()));
+ result.put(bucketNum, recs);
+ }
+ }
+ return result;
+ }
+
+ //assumes bucket_NNNNN format of file name
+ private Integer getBucketNumber(File bucketFile) {
+ String fname = bucketFile.getName();
+ int start = fname.indexOf('_');
+ String number = fname.substring(start+1, fname.length());
+ return Integer.parseInt(number);
+ }
+
// delete db and all tables in it
public static void dropDB(IMetaStoreClient client, String databaseName) {
try {
@@ -784,69 +1148,139 @@ public static void dropDB(IMetaStoreClient client, String databaseName) {
}
- public void createDbAndTable(IMetaStoreClient client, String databaseName,
- String tableName, List partVals)
+
+
+ ///////// -------- UTILS ------- /////////
+ // returns Path of the partition created (if any) else Path of table
+ public static Path createDbAndTable(Driver driver, String databaseName,
+ String tableName, List partVals,
+ String[] colNames, String[] colTypes,
+ String[] bucketCols,
+ String[] partNames, String dbLocation, int bucketCount)
throws Exception {
- Database db = new Database();
- db.setName(databaseName);
- String dbLocation = "raw://" + dbFolder.newFolder(databaseName + ".db").toURI().getPath();
- db.setLocationUri(dbLocation);
- client.createDatabase(db);
-
- Table tbl = new Table();
- tbl.setDbName(databaseName);
- tbl.setTableName(tableName);
- tbl.setTableType(TableType.MANAGED_TABLE.toString());
- StorageDescriptor sd = new StorageDescriptor();
- sd.setCols(getTableColumns());
- sd.setNumBuckets(1);
- sd.setLocation(dbLocation + Path.SEPARATOR + tableName);
- tbl.setPartitionKeys(getPartitionKeys());
-
- tbl.setSd(sd);
-
- sd.setBucketCols(new ArrayList(2));
- sd.setSerdeInfo(new SerDeInfo());
- sd.getSerdeInfo().setName(tbl.getTableName());
- sd.getSerdeInfo().setParameters(new HashMap());
- sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
-
- sd.getSerdeInfo().setSerializationLib(OrcSerde.class.getName());
- sd.setInputFormat(HiveInputFormat.class.getName());
- sd.setOutputFormat(OrcOutputFormat.class.getName());
-
- Map tableParams = new HashMap();
- tbl.setParameters(tableParams);
- client.createTable(tbl);
- try {
- addPartition(client, tbl, partVals);
- } catch (AlreadyExistsException e) {
+ String dbUri = "raw://" + new Path(dbLocation).toUri().toString();
+ String tableLoc = dbUri + Path.SEPARATOR + tableName;
+
+ runDDL(driver, "create database IF NOT EXISTS " + databaseName + " location '" + dbUri + "'");
+ runDDL(driver, "use " + databaseName);
+ String crtTbl = "create table " + tableName +
+ " ( " + getTableColumnsStr(colNames,colTypes) + " )" +
+ getPartitionStmtStr(partNames) +
+ " clustered by ( " + join(bucketCols, ",") + " )" +
+ " into " + bucketCount + " buckets " +
+ " stored as orc " +
+ " location '" + tableLoc + "'";
+ runDDL(driver, crtTbl);
+ if(partNames!=null && partNames.length!=0) {
+ return addPartition(driver, tableName, partVals, partNames);
}
- Partition createdPartition = client.getPartition(databaseName, tableName, partVals);
- partLocation = createdPartition.getSd().getLocation();
+ return new Path(tableLoc);
+ }
+
+ private static Path addPartition(Driver driver, String tableName, List partVals, String[] partNames) throws QueryFailedException, CommandNeedRetryException, IOException {
+ String partSpec = getPartsSpec(partNames, partVals);
+ String addPart = "alter table " + tableName + " add partition ( " + partSpec + " )";
+ runDDL(driver, addPart);
+ return getPartitionPath(driver, tableName, partSpec);
}
- private static void addPartition(IMetaStoreClient client, Table tbl
- , List partValues)
- throws IOException, TException {
- Partition part = new Partition();
- part.setDbName(tbl.getDbName());
- part.setTableName(tblName);
- StorageDescriptor sd = new StorageDescriptor(tbl.getSd());
- sd.setLocation(sd.getLocation() + Path.SEPARATOR + makePartPath(tbl.getPartitionKeys()
- , partValues));
- part.setSd(sd);
- part.setValues(partValues);
- client.add_partition(part);
+ private static Path getPartitionPath(Driver driver, String tableName, String partSpec) throws CommandNeedRetryException, IOException {
+ ArrayList res = queryTable(driver, "describe extended " + tableName + " PARTITION (" + partSpec + ")");
+ String partInfo = res.get(res.size() - 1);
+ int start = partInfo.indexOf("location:") + "location:".length();
+ int end = partInfo.indexOf(",",start);
+ return new Path( partInfo.substring(start,end) );
+ }
+
+ private static String getTableColumnsStr(String[] colNames, String[] colTypes) {
+ StringBuffer sb = new StringBuffer();
+ for (int i=0; i < colNames.length; ++i) {
+ sb.append(colNames[i] + " " + colTypes[i]);
+ if (i partVals) {
+ StringBuffer sb = new StringBuffer();
+ for (int i=0; i < partVals.size(); ++i) {
+ sb.append(partNames[i] + " = '" + partVals.get(i) + "'");
+ if(i < partVals.size()-1) {
+ sb.append(",");
+ }
+ }
+ return sb.toString();
+ }
+
+ private static String join(String[] values, String delimiter) {
+ if(values==null)
+ return null;
+ StringBuffer strbuf = new StringBuffer();
+
+ boolean first = true;
+
+ for (Object value : values) {
+ if (!first) { strbuf.append(delimiter); } else { first = false; }
+ strbuf.append(value.toString());
+ }
+
+ return strbuf.toString();
+ }
+ private static String getPartitionStmtStr(String[] partNames) {
+ if ( partNames == null || partNames.length == 0) {
+ return "";
+ }
+ return " partitioned by (" + getTablePartsStr(partNames) + " )";
+ }
+
+ private static boolean runDDL(Driver driver, String sql) throws QueryFailedException {
+ LOG.debug(sql);
+ System.out.println(sql);
+ int retryCount = 1; // # of times to retry if first attempt fails
+ for (int attempt=0; attempt <= retryCount; ++attempt) {
+ try {
+ //LOG.debug("Running Hive Query: "+ sql);
+ CommandProcessorResponse cpr = driver.run(sql);
+ if(cpr.getResponseCode() == 0) {
+ return true;
+ }
+ LOG.error("Statement: " + sql + " failed: " + cpr);
+ } catch (CommandNeedRetryException e) {
+ if (attempt == retryCount) {
+ throw new QueryFailedException(sql, e);
+ }
+ continue;
+ }
+ } // for
+ return false;
+ }
+
+
private static String makePartPath(List partKeys, List partVals) {
if (partKeys.size()!=partVals.size()) {
throw new IllegalArgumentException("Partition values:" + partVals
+ ", does not match the partition Keys in table :" + partKeys );
}
- StringBuilder buff = new StringBuilder(partKeys.size()*20);
+ StringBuffer buff = new StringBuffer(partKeys.size()*20);
buff.append(" ( ");
int i=0;
for (FieldSchema schema : partKeys) {
@@ -870,4 +1304,56 @@ private static String makePartPath(List partKeys, List part
fields.add(new FieldSchema(COL2, serdeConstants.STRING_TYPE_NAME, ""));
return fields;
}
+
+ public static ArrayList queryTable(Driver driver, String query)
+ throws CommandNeedRetryException, IOException {
+ driver.run(query);
+ ArrayList res = new ArrayList();
+ driver.getResults(res);
+ if(res.isEmpty())
+ System.err.println(driver.getErrorMsg());
+ return res;
+ }
+
+ private static class SampleRec {
+ public String field1;
+ public int field2;
+ public String field3;
+
+ public SampleRec(String field1, int field2, String field3) {
+ this.field1 = field1;
+ this.field2 = field2;
+ this.field3 = field3;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ SampleRec that = (SampleRec) o;
+
+ if (field2 != that.field2) return false;
+ if (field1 != null ? !field1.equals(that.field1) : that.field1 != null) return false;
+ return !(field3 != null ? !field3.equals(that.field3) : that.field3 != null);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = field1 != null ? field1.hashCode() : 0;
+ result = 31 * result + field2;
+ result = 31 * result + (field3 != null ? field3.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return " { " +
+ "'" + field1 + '\'' +
+ "," + field2 +
+ ",'" + field3 + '\'' +
+ " }";
+ }
+ }
}
diff --git serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
index 00a6384..3f86e87 100644
--- serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
+++ serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java
@@ -34,8 +34,6 @@
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.io.HiveIntervalDayTimeWritable;
-import org.apache.hadoop.hive.serde2.io.HiveIntervalYearMonthWritable;
import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
@@ -49,8 +47,6 @@
import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveIntervalDayTimeObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveIntervalYearMonthObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaStringObjectInspector;
@@ -64,8 +60,6 @@
import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableFloatObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveCharObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveDecimalObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveIntervalDayTimeObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveIntervalYearMonthObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableHiveVarcharObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableIntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableLongObjectInspector;
@@ -475,7 +469,7 @@ public static String getObjectInspectorName(ObjectInspector oi) {
return result.toString();
}
case UNION: {
- StringBuilder result = new StringBuilder();
+ StringBuffer result = new StringBuffer();
result.append(oi.getClass().getSimpleName() + "<");
UnionObjectInspector uoi = (UnionObjectInspector)oi;
List ois = uoi.getObjectInspectors();
@@ -499,111 +493,131 @@ public static int hashCode(Object o, ObjectInspector objIns) {
return 0;
}
switch (objIns.getCategory()) {
- case PRIMITIVE: {
- PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) objIns);
- switch (poi.getPrimitiveCategory()) {
- case VOID:
- return 0;
- case BOOLEAN:
- return ((BooleanObjectInspector) poi).get(o) ? 1 : 0;
- case BYTE:
- return ((ByteObjectInspector) poi).get(o);
- case SHORT:
- return ((ShortObjectInspector) poi).get(o);
- case INT:
- return ((IntObjectInspector) poi).get(o);
- case LONG: {
- long a = ((LongObjectInspector) poi).get(o);
- return (int) ((a >>> 32) ^ a);
+ case PRIMITIVE: {
+ PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) objIns);
+ switch (poi.getPrimitiveCategory()) {
+ case VOID:
+ return 0;
+ case BOOLEAN:
+ return ((BooleanObjectInspector) poi).get(o) ? 1 : 0;
+ case BYTE:
+ return ((ByteObjectInspector) poi).get(o);
+ case SHORT:
+ return ((ShortObjectInspector) poi).get(o);
+ case INT:
+ return ((IntObjectInspector) poi).get(o);
+ case LONG: {
+ long a = ((LongObjectInspector) poi).get(o);
+ return (int) ((a >>> 32) ^ a);
+ }
+ case FLOAT:
+ return Float.floatToIntBits(((FloatObjectInspector) poi).get(o));
+ case DOUBLE: {
+ // This hash function returns the same result as Double.hashCode()
+ // while DoubleWritable.hashCode returns a different result.
+ long a = Double.doubleToLongBits(((DoubleObjectInspector) poi).get(o));
+ return (int) ((a >>> 32) ^ a);
+ }
+ case STRING: {
+ // This hash function returns the same result as String.hashCode() when
+ // all characters are ASCII, while Text.hashCode() always returns a
+ // different result.
+ Text t = ((StringObjectInspector) poi).getPrimitiveWritableObject(o);
+ int r = 0;
+ for (int i = 0; i < t.getLength(); i++) {
+ r = r * 31 + t.getBytes()[i];
+ }
+ return r;
+ }
+ case CHAR:
+ return ((HiveCharObjectInspector) poi).getPrimitiveWritableObject(o).hashCode();
+ case VARCHAR:
+ return ((HiveVarcharObjectInspector)poi).getPrimitiveWritableObject(o).hashCode();
+ case BINARY:
+ return ((BinaryObjectInspector) poi).getPrimitiveWritableObject(o).hashCode();
+
+ case DATE:
+ return ((DateObjectInspector) poi).getPrimitiveWritableObject(o).hashCode();
+ case TIMESTAMP:
+ TimestampWritable t = ((TimestampObjectInspector) poi)
+ .getPrimitiveWritableObject(o);
+ return t.hashCode();
+ case DECIMAL:
+ return ((HiveDecimalObjectInspector) poi).getPrimitiveWritableObject(o).hashCode();
+
+ default: {
+ throw new RuntimeException("Unknown type: "
+ + poi.getPrimitiveCategory());
+ }
+ }
}
- case FLOAT:
- return Float.floatToIntBits(((FloatObjectInspector) poi).get(o));
- case DOUBLE: {
- // This hash function returns the same result as Double.hashCode()
- // while DoubleWritable.hashCode returns a different result.
- long a = Double.doubleToLongBits(((DoubleObjectInspector) poi).get(o));
- return (int) ((a >>> 32) ^ a);
+ case LIST: {
+ int r = 0;
+ ListObjectInspector listOI = (ListObjectInspector)objIns;
+ ObjectInspector elemOI = listOI.getListElementObjectInspector();
+ for (int ii = 0; ii < listOI.getListLength(o); ++ii) {
+ r = 31 * r + hashCode(listOI.getListElement(o, ii), elemOI);
+ }
+ return r;
}
- case STRING: {
- // This hash function returns the same result as String.hashCode() when
- // all characters are ASCII, while Text.hashCode() always returns a
- // different result.
- Text t = ((StringObjectInspector) poi).getPrimitiveWritableObject(o);
+ case MAP: {
int r = 0;
- for (int i = 0; i < t.getLength(); i++) {
- r = r * 31 + t.getBytes()[i];
+ MapObjectInspector mapOI = (MapObjectInspector)objIns;
+ ObjectInspector keyOI = mapOI.getMapKeyObjectInspector();
+ ObjectInspector valueOI = mapOI.getMapValueObjectInspector();
+ Map, ?> map = mapOI.getMap(o);
+ for (Map.Entry,?> entry : map.entrySet()) {
+ r += hashCode(entry.getKey(), keyOI) ^
+ hashCode(entry.getValue(), valueOI);
}
return r;
}
- case CHAR:
- return ((HiveCharObjectInspector) poi).getPrimitiveWritableObject(o).hashCode();
- case VARCHAR:
- return ((HiveVarcharObjectInspector)poi).getPrimitiveWritableObject(o).hashCode();
- case BINARY:
- return ((BinaryObjectInspector) poi).getPrimitiveWritableObject(o).hashCode();
-
- case DATE:
- return ((DateObjectInspector) poi).getPrimitiveWritableObject(o).hashCode();
- case TIMESTAMP:
- TimestampWritable t = ((TimestampObjectInspector) poi)
- .getPrimitiveWritableObject(o);
- return t.hashCode();
- case INTERVAL_YEAR_MONTH:
- HiveIntervalYearMonthWritable intervalYearMonth = ((HiveIntervalYearMonthObjectInspector) poi)
- .getPrimitiveWritableObject(o);
- return intervalYearMonth.hashCode();
- case INTERVAL_DAY_TIME:
- HiveIntervalDayTimeWritable intervalDayTime = ((HiveIntervalDayTimeObjectInspector) poi)
- .getPrimitiveWritableObject(o);
- return intervalDayTime.hashCode();
- case DECIMAL:
- return ((HiveDecimalObjectInspector) poi).getPrimitiveWritableObject(o).hashCode();
+ case STRUCT:
+ int r = 0;
+ StructObjectInspector structOI = (StructObjectInspector)objIns;
+ List extends StructField> fields = structOI.getAllStructFieldRefs();
+ for (StructField field : fields) {
+ r = 31 * r + hashCode(structOI.getStructFieldData(o, field),
+ field.getFieldObjectInspector());
+ }
+ return r;
- default: {
- throw new RuntimeException("Unknown type: "
- + poi.getPrimitiveCategory());
- }
- }
- }
- case LIST: {
- int r = 0;
- ListObjectInspector listOI = (ListObjectInspector)objIns;
- ObjectInspector elemOI = listOI.getListElementObjectInspector();
- for (int ii = 0; ii < listOI.getListLength(o); ++ii) {
- r = 31 * r + hashCode(listOI.getListElement(o, ii), elemOI);
- }
- return r;
- }
- case MAP: {
- int r = 0;
- MapObjectInspector mapOI = (MapObjectInspector)objIns;
- ObjectInspector keyOI = mapOI.getMapKeyObjectInspector();
- ObjectInspector valueOI = mapOI.getMapValueObjectInspector();
- Map, ?> map = mapOI.getMap(o);
- for (Map.Entry,?> entry : map.entrySet()) {
- r += hashCode(entry.getKey(), keyOI) ^
- hashCode(entry.getValue(), valueOI);
- }
- return r;
+ case UNION:
+ UnionObjectInspector uOI = (UnionObjectInspector)objIns;
+ byte tag = uOI.getTag(o);
+ return hashCode(uOI.getField(o), uOI.getObjectInspectors().get(tag));
+
+ default:
+ throw new RuntimeException("Unknown type: "+ objIns.getTypeName());
}
- case STRUCT:
- int r = 0;
- StructObjectInspector structOI = (StructObjectInspector)objIns;
- List extends StructField> fields = structOI.getAllStructFieldRefs();
- for (StructField field : fields) {
- r = 31 * r + hashCode(structOI.getStructFieldData(o, field),
- field.getFieldObjectInspector());
- }
- return r;
+ }
- case UNION:
- UnionObjectInspector uOI = (UnionObjectInspector)objIns;
- byte tag = uOI.getTag(o);
- return hashCode(uOI.getField(o), uOI.getObjectInspectors().get(tag));
+ /**
+ * Computes the bucket number to which the bucketFields belong to
+ * @param bucketFields the bucketed fields of the row
+ * @param bucketFieldInspectors the ObjectInpsectors for each of the bucketed fields
+ * @param totalBuckets the number of buckets in the table
+ * @return the bucket number
+ */
+ public static int getBucketNumber(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors, int totalBuckets) {
+ int hashCode = getBucketHashCode(bucketFields, bucketFieldInspectors);
+ int bucketID = (hashCode & Integer.MAX_VALUE) % totalBuckets;
+ return bucketID;
+ }
- default:
- throw new RuntimeException("Unknown type: "+ objIns.getTypeName());
+ /**
+ * Computes the hash code for the given bucketed fields
+ * @param bucketFields
+ * @param bucketFieldInspectors
+ * @return
+ */
+ private static int getBucketHashCode(Object[] bucketFields, ObjectInspector[] bucketFieldInspectors) {
+ int hashCode = 0;
+ for (int i = 0; i < bucketFields.length; i++) {
+ int fieldHash = ObjectInspectorUtils.hashCode(bucketFields[i], bucketFieldInspectors[i]);
+ hashCode = 31 * hashCode + fieldHash;
}
+ return hashCode;
}
/**
@@ -722,26 +736,12 @@ public static int compare(Object o1, ObjectInspector oi1, Object o2,
case FLOAT: {
float v1 = ((FloatObjectInspector) poi1).get(o1);
float v2 = ((FloatObjectInspector) poi2).get(o2);
-
- // The IEEE 754 floating point spec specifies that signed -0.0 and 0.0 should be treated as equal.
- if (v1 == 0.0f && v2 == 0.0f) {
- return 0;
- } else {
- // Float.compare() treats -0.0 and 0.0 as different
- return Float.compare(v1, v2);
- }
+ return Float.compare(v1, v2);
}
case DOUBLE: {
double v1 = ((DoubleObjectInspector) poi1).get(o1);
double v2 = ((DoubleObjectInspector) poi2).get(o2);
-
- // The IEEE 754 floating point spec specifies that signed -0.0 and 0.0 should be treated as equal.
- if (v1 == 0.0d && v2 == 0.0d) {
- return 0;
- } else {
- // Double.compare() treats -0.0 and 0.0 as different
- return Double.compare(v1, v2);
- }
+ return Double.compare(v1, v2);
}
case STRING: {
if (poi1.preferWritable() || poi2.preferWritable()) {
@@ -786,20 +786,6 @@ public static int compare(Object o1, ObjectInspector oi1, Object o2,
.getPrimitiveWritableObject(o2);
return t1.compareTo(t2);
}
- case INTERVAL_YEAR_MONTH: {
- HiveIntervalYearMonthWritable i1 = ((HiveIntervalYearMonthObjectInspector) poi1)
- .getPrimitiveWritableObject(o1);
- HiveIntervalYearMonthWritable i2 = ((HiveIntervalYearMonthObjectInspector) poi2)
- .getPrimitiveWritableObject(o2);
- return i1.compareTo(i2);
- }
- case INTERVAL_DAY_TIME: {
- HiveIntervalDayTimeWritable i1 = ((HiveIntervalDayTimeObjectInspector) poi1)
- .getPrimitiveWritableObject(o1);
- HiveIntervalDayTimeWritable i2 = ((HiveIntervalDayTimeObjectInspector) poi2)
- .getPrimitiveWritableObject(o2);
- return i1.compareTo(i2);
- }
case DECIMAL: {
HiveDecimalWritable t1 = ((HiveDecimalObjectInspector) poi1)
.getPrimitiveWritableObject(o1);
@@ -993,9 +979,9 @@ public static boolean compareTypes(ObjectInspector o1, ObjectInspector o2) {
if (childFieldsList1 == null && childFieldsList2 == null) {
return true;
- } else if (childFieldsList1 == null || childFieldsList2 == null) {
- return false;
- } else if (childFieldsList1.size() != childFieldsList2.size()) {
+ }
+
+ if (childFieldsList1.size() != childFieldsList2.size()) {
return false;
}
@@ -1042,11 +1028,8 @@ public static boolean compareTypes(ObjectInspector o1, ObjectInspector o2) {
}
public static ConstantObjectInspector getConstantObjectInspector(ObjectInspector oi, Object value) {
- if (oi instanceof ConstantObjectInspector) {
- return (ConstantObjectInspector) oi;
- }
ObjectInspector writableOI = getStandardObjectInspector(oi, ObjectInspectorCopyOption.WRITABLE);
- Object writableValue = value == null ? value :
+ Object writableValue =
ObjectInspectorConverters.getConverter(oi, writableOI).convert(value);
switch (writableOI.getCategory()) {
case PRIMITIVE:
@@ -1073,24 +1056,6 @@ public static ConstantObjectInspector getConstantObjectInspector(ObjectInspector
ObjectInspectorCopyOption.WRITABLE
),
(Map, ?>)writableValue);
- case STRUCT:
- StructObjectInspector soi = (StructObjectInspector) oi;
- List extends StructField> fields = soi.getAllStructFieldRefs();
- List fieldNames = new ArrayList(fields.size());
- List fieldObjectInspectors = new ArrayList(
- fields.size());
- for (StructField f : fields) {
- fieldNames.add(f.getFieldName());
- fieldObjectInspectors.add(getStandardObjectInspector(f
- .getFieldObjectInspector(), ObjectInspectorCopyOption.WRITABLE));
- }
- if (value != null && (writableValue.getClass().isArray())) {
- writableValue = java.util.Arrays.asList((Object[])writableValue);
- }
- return ObjectInspectorFactory.getStandardConstantStructObjectInspector(
- fieldNames,
- fieldObjectInspectors,
- (List>)writableValue);
default:
throw new IllegalArgumentException(
writableOI.getCategory() + " not yet supported for constant OI");
@@ -1106,7 +1071,6 @@ public static boolean supportsConstantObjectInspector(ObjectInspector oi) {
case PRIMITIVE:
case LIST:
case MAP:
- case STRUCT:
return true;
default:
return false;
@@ -1153,10 +1117,6 @@ private static boolean isInstanceOfSettablePrimitiveOI(PrimitiveObjectInspector
return oi instanceof SettableDateObjectInspector;
case TIMESTAMP:
return oi instanceof SettableTimestampObjectInspector;
- case INTERVAL_YEAR_MONTH:
- return oi instanceof SettableHiveIntervalYearMonthObjectInspector;
- case INTERVAL_DAY_TIME:
- return oi instanceof SettableHiveIntervalDayTimeObjectInspector;
case BINARY:
return oi instanceof SettableBinaryObjectInspector;
case DECIMAL: