diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
index e409e75571..4ec10ad4d7 100644
--- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/AbstractRecordWriter.java
@@ -46,6 +46,7 @@
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
@@ -54,19 +55,23 @@
public abstract class AbstractRecordWriter implements RecordWriter {
static final private Logger LOG = LoggerFactory.getLogger(AbstractRecordWriter.class.getName());
- final HiveConf conf;
- final HiveEndPoint endPoint;
+ private final HiveConf conf;
+ private final HiveEndPoint endPoint;
final Table tbl;
- final IMetaStoreClient msClient;
- protected final List bucketIds;
- ArrayList updaters = null;
+ private final IMetaStoreClient msClient;
+ final List bucketIds;
+ private ArrayList updaters = null;
- public final int totalBuckets;
+ private final int totalBuckets;
+ /**
+ * Indicates whether target table is bucketed
+ */
+ private final boolean isBucketed;
private final Path partitionPath;
- final AcidOutputFormat,?> outf;
+ private final AcidOutputFormat,?> outf;
private Object[] bucketFieldData; // Pre-allocated in constructor. Updated on each write.
private Long curBatchMinTxnId;
private Long curBatchMaxTxnId;
@@ -109,16 +114,22 @@ public TableWriterPair run() throws Exception {
this.tbl = twp.tbl;
this.partitionPath = twp.partitionPath;
}
- this.totalBuckets = tbl.getSd().getNumBuckets();
- if (totalBuckets <= 0) {
- throw new StreamingException("Cannot stream to table that has not been bucketed : "
- + endPoint);
+ this.isBucketed = tbl.getSd().getNumBuckets() > 0;
+ /**
+ * For unbucketed tables we have exactly 1 RecrodUpdater for each AbstractRecordWriter which
+ * ends up writing to a file bucket_000000
+ * See also {@link #getBucket(Object)}
+ */
+ this.totalBuckets = isBucketed ? tbl.getSd().getNumBuckets() : 1;
+ if(isBucketed) {
+ this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols());
+ this.bucketFieldData = new Object[bucketIds.size()];
+ }
+ else {
+ bucketIds = Collections.emptyList();
}
- this.bucketIds = getBucketColIDs(tbl.getSd().getBucketCols(), tbl.getSd().getCols());
- this.bucketFieldData = new Object[bucketIds.size()];
String outFormatName = this.tbl.getSd().getOutputFormat();
outf = (AcidOutputFormat, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(outFormatName), conf);
- bucketFieldData = new Object[bucketIds.size()];
} catch(InterruptedException e) {
throw new StreamingException(endPoint2.toString(), e);
} catch (MetaException | NoSuchObjectException e) {
@@ -169,6 +180,9 @@ String getWatermark() {
// returns the bucket number to which the record belongs to
protected int getBucket(Object row) throws SerializationError {
+ if(!isBucketed) {
+ return 0;
+ }
ObjectInspector[] inspectors = getBucketObjectInspectors();
Object[] bucketFields = getBucketFields(row);
return ObjectInspectorUtils.getBucketNumber(bucketFields, inspectors, totalBuckets);
@@ -204,7 +218,7 @@ public void newBatch(Long minTxnId, Long maxTxnID)
curBatchMaxTxnId = maxTxnID;
updaters = new ArrayList(totalBuckets);
for (int bucket = 0; bucket < totalBuckets; bucket++) {
- updaters.add(bucket, null);
+ updaters.add(bucket, null);//so that get(i) returns null rather than ArrayOutOfBounds
}
}
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
index 81f6155abc..28c98bd75f 100644
--- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java
@@ -20,6 +20,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.cli.CliSessionState;
@@ -338,7 +340,7 @@ private void checkEndPoint(HiveEndPoint endPoint, IMetaStoreClient msClient)
// 1 - check if TBLPROPERTIES ('transactional'='true') is set on table
Map params = t.getParameters();
if (params != null) {
- String transactionalProp = params.get("transactional");
+ String transactionalProp = params.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
if (transactionalProp == null || !transactionalProp.equalsIgnoreCase("true")) {
LOG.error("'transactional' property is not set on Table " + endPoint);
throw new InvalidTable(endPoint.database, endPoint.table, "\'transactional\' property" +
diff --git hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
index ed4d307014..a879b974ae 100644
--- hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
+++ hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/package.html
@@ -30,7 +30,7 @@
all Hive queries initiated subsequently.
-This API is intended for streaming clients such as Flume and Storm,
+This API is intended for streaming clients such as NiFi, Flume and Storm,
which continuously generate data. Streaming support is built on top of
ACID based insert/update support in Hive.
@@ -56,10 +56,7 @@
- Currently, only ORC storage format is supported. So
'stored as orc' must be specified during table creation.
- - The hive table must be bucketed, but not sorted. So something like
- 'clustered by (colName) into 10 buckets' must
- be specified during table creation. The number of buckets
- is ideally the same as the number of streaming writers.
+ - The hive table may be bucketed but must not be sorted.
- User of the client streaming process must have the necessary
permissions to write to the table or partition and create partitions in
the table.
@@ -67,7 +64,6 @@
- hive.input.format =
org.apache.hadoop.hive.ql.io.HiveInputFormat
- - hive.vectorized.execution.enabled = false
The above client settings are a temporary requirement and the intention is to
drop the need for them in the near future.
@@ -165,8 +161,21 @@
- Delimited text input.
- StrictJsonWriter
- JSON text input.
+ - StrictRegexWriter
+ - text input with regex.
+Performance, Concurrency, Etc.
+
+ Each StreamingConnection is writing data at the rate the underlying
+ FileSystem can accept it. If that is not sufficient, multiple StreamingConnection objects can
+ be created concurrently.
+
+
+ Each StreamingConnection can have at most 1 outstanding TransactionBatch and each TransactionBatch
+ may have at most 2 threads operaing on it.
+ See TransactionBatch
+